001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.List;
025import java.util.Objects;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.locks.ReadWriteLock;
028import java.util.concurrent.locks.ReentrantReadWriteLock;
029import java.util.regex.Matcher;
030import java.util.regex.Pattern;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FSDataInputStream;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.client.RegionInfo;
038import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
039import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
040import org.apache.hadoop.hbase.util.CancelableProgressable;
041import org.apache.hadoop.hbase.util.CommonFSUtils;
042import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
043import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.apache.yetus.audience.InterfaceStability;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
049import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
050
051/**
052 * Base class of a WAL Provider that returns a single thread safe WAL that writes to Hadoop FS. By
053 * default, this implementation picks a directory in Hadoop FS based on a combination of
054 * <ul>
055 * <li>the HBase root directory
056 * <li>HConstants.HREGION_LOGDIR_NAME
057 * <li>the given factory's factoryId (usually identifying the regionserver by host:port)
058 * </ul>
059 * It also uses the providerId to differentiate among files.
060 */
061@InterfaceAudience.Private
062@InterfaceStability.Evolving
063public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implements WALProvider {
064
065  private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWALProvider.class);
066
067  /** Separate old log into different dir by regionserver name **/
068  public static final String SEPARATE_OLDLOGDIR = "hbase.separate.oldlogdir.by.regionserver";
069  public static final boolean DEFAULT_SEPARATE_OLDLOGDIR = false;
070
071  // Only public so classes back in regionserver.wal can access
072  public interface Reader extends WAL.Reader {
073    /**
074     * @param fs File system.
075     * @param path Path.
076     * @param c Configuration.
077     * @param s Input stream that may have been pre-opened by the caller; may be null.
078     */
079    void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException;
080  }
081
082  protected volatile T wal;
083  protected WALFactory factory;
084  protected Configuration conf;
085  protected List<WALActionsListener> listeners = new ArrayList<>();
086  protected String providerId;
087  protected AtomicBoolean initialized = new AtomicBoolean(false);
088  // for default wal provider, logPrefix won't change
089  protected String logPrefix;
090
091  /**
092   * We use walCreateLock to prevent wal recreation in different threads, and also prevent getWALs
093   * missing the newly created WAL, see HBASE-21503 for more details.
094   */
095  private final ReadWriteLock walCreateLock = new ReentrantReadWriteLock();
096
097  /**
098   * @param factory factory that made us, identity used for FS layout. may not be null
099   * @param conf may not be null
100   * @param providerId differentiate between providers from one factory, used for FS layout. may be
101   *          null
102   */
103  @Override
104  public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
105    if (!initialized.compareAndSet(false, true)) {
106      throw new IllegalStateException("WALProvider.init should only be called once.");
107    }
108    this.factory = factory;
109    this.conf = conf;
110    this.providerId = providerId;
111    // get log prefix
112    StringBuilder sb = new StringBuilder().append(factory.factoryId);
113    if (providerId != null) {
114      if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
115        sb.append(providerId);
116      } else {
117        sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
118      }
119    }
120    logPrefix = sb.toString();
121    doInit(conf);
122  }
123
124  @Override
125  public List<WAL> getWALs() {
126    if (wal != null) {
127      return Lists.newArrayList(wal);
128    }
129    walCreateLock.readLock().lock();
130    try {
131      if (wal == null) {
132        return Collections.emptyList();
133      } else {
134        return Lists.newArrayList(wal);
135      }
136    } finally {
137      walCreateLock.readLock().unlock();
138    }
139  }
140
141  @Override
142  public T getWAL(RegionInfo region) throws IOException {
143    T walCopy = wal;
144    if (walCopy != null) {
145      return walCopy;
146    }
147    walCreateLock.writeLock().lock();
148    try {
149      walCopy = wal;
150      if (walCopy != null) {
151        return walCopy;
152      }
153      walCopy = createWAL();
154      boolean succ = false;
155      try {
156        walCopy.init();
157        succ = true;
158      } finally {
159        if (!succ) {
160          walCopy.close();
161        }
162      }
163      wal = walCopy;
164      return walCopy;
165    } finally {
166      walCreateLock.writeLock().unlock();
167    }
168  }
169
170  protected abstract T createWAL() throws IOException;
171
172  protected abstract void doInit(Configuration conf) throws IOException;
173
174  @Override
175  public void shutdown() throws IOException {
176    T log = this.wal;
177    if (log != null) {
178      log.shutdown();
179    }
180  }
181
182  @Override
183  public void close() throws IOException {
184    T log = this.wal;
185    if (log != null) {
186      log.close();
187    }
188  }
189
190  /**
191   * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the
192   * number of files (rolled and active). if either of them aren't, count 0 for that provider.
193   */
194  @Override
195  public long getNumLogFiles() {
196    T log = this.wal;
197    return log == null ? 0 : log.getNumLogFiles();
198  }
199
200  /**
201   * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the
202   * size of files (only rolled). if either of them aren't, count 0 for that provider.
203   */
204  @Override
205  public long getLogFileSize() {
206    T log = this.wal;
207    return log == null ? 0 : log.getLogFileSize();
208  }
209
210  /**
211   * returns the number of rolled WAL files.
212   */
213  @VisibleForTesting
214  public static int getNumRolledLogFiles(WAL wal) {
215    return ((AbstractFSWAL<?>) wal).getNumRolledLogFiles();
216  }
217
218  /**
219   * returns the size of rolled WAL files.
220   */
221  @VisibleForTesting
222  public static long getLogFileSize(WAL wal) {
223    return ((AbstractFSWAL<?>) wal).getLogFileSize();
224  }
225
226  /**
227   * return the current filename from the current wal.
228   */
229  @VisibleForTesting
230  public static Path getCurrentFileName(final WAL wal) {
231    return ((AbstractFSWAL<?>) wal).getCurrentFileName();
232  }
233
234  /**
235   * request a log roll, but don't actually do it.
236   */
237  @VisibleForTesting
238  static void requestLogRoll(final WAL wal) {
239    ((AbstractFSWAL<?>) wal).requestLogRoll();
240  }
241
242  // should be package private; more visible for use in AbstractFSWAL
243  public static final String WAL_FILE_NAME_DELIMITER = ".";
244  /** The hbase:meta region's WAL filename extension */
245  @VisibleForTesting
246  public static final String META_WAL_PROVIDER_ID = ".meta";
247  static final String DEFAULT_PROVIDER_ID = "default";
248
249  // Implementation details that currently leak in tests or elsewhere follow
250  /** File Extension used while splitting an WAL into regions (HBASE-2312) */
251  public static final String SPLITTING_EXT = "-splitting";
252
253  /**
254   * It returns the file create timestamp from the file name. For name format see
255   * {@link #validateWALFilename(String)} public until remaining tests move to o.a.h.h.wal
256   * @param wal must not be null
257   * @return the file number that is part of the WAL file name
258   */
259  @VisibleForTesting
260  public static long extractFileNumFromWAL(final WAL wal) {
261    final Path walName = ((AbstractFSWAL<?>) wal).getCurrentFileName();
262    if (walName == null) {
263      throw new IllegalArgumentException("The WAL path couldn't be null");
264    }
265    Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(walName.getName());
266    if (matcher.matches()) {
267      return Long.parseLong(matcher.group(2));
268    } else {
269      throw new IllegalArgumentException(walName.getName() + " is not a valid wal file name");
270    }
271  }
272
273  /**
274   * Pattern used to validate a WAL file name see {@link #validateWALFilename(String)} for
275   * description.
276   */
277  private static final Pattern WAL_FILE_NAME_PATTERN =
278    Pattern.compile("(.+)\\.(\\d+)(\\.[0-9A-Za-z]+)?");
279
280  /**
281   * A WAL file name is of the format: &lt;wal-name&gt;{@link #WAL_FILE_NAME_DELIMITER}
282   * &lt;file-creation-timestamp&gt;[.&lt;suffix&gt;]. provider-name is usually made up of a
283   * server-name and a provider-id
284   * @param filename name of the file to validate
285   * @return <tt>true</tt> if the filename matches an WAL, <tt>false</tt> otherwise
286   */
287  public static boolean validateWALFilename(String filename) {
288    return WAL_FILE_NAME_PATTERN.matcher(filename).matches();
289  }
290
291  /**
292   * Construct the directory name for all WALs on a given server. Dir names currently look like this
293   * for WALs: <code>hbase//WALs/kalashnikov.att.net,61634,1486865297088</code>.
294   * @param serverName Server name formatted as described in {@link ServerName}
295   * @return the relative WAL directory name, e.g. <code>.logs/1.example.org,60030,12345</code> if
296   *         <code>serverName</code> passed is <code>1.example.org,60030,12345</code>
297   */
298  public static String getWALDirectoryName(final String serverName) {
299    StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
300    dirName.append("/");
301    dirName.append(serverName);
302    return dirName.toString();
303  }
304
305  /**
306   * Construct the directory name for all old WALs on a given server. The default old WALs dir looks
307   * like: <code>hbase/oldWALs</code>. If you config hbase.separate.oldlogdir.by.regionserver to
308   * true, it looks like <code>hbase//oldWALs/kalashnikov.att.net,61634,1486865297088</code>.
309   * @param serverName Server name formatted as described in {@link ServerName}
310   * @return the relative WAL directory name
311   */
312  public static String getWALArchiveDirectoryName(Configuration conf, final String serverName) {
313    StringBuilder dirName = new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME);
314    if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
315      dirName.append(Path.SEPARATOR);
316      dirName.append(serverName);
317    }
318    return dirName.toString();
319  }
320
321  /**
322   * Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts,
323   * this method ignores the format of the logfile component. Current format: [base directory for
324   * hbase]/hbase/.logs/ServerName/logfile or [base directory for
325   * hbase]/hbase/.logs/ServerName-splitting/logfile Expected to work for individual log files and
326   * server-specific directories.
327   * @return null if it's not a log file. Returns the ServerName of the region server that created
328   *         this log file otherwise.
329   */
330  public static ServerName getServerNameFromWALDirectoryName(Configuration conf, String path)
331      throws IOException {
332    if (path == null || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) {
333      return null;
334    }
335
336    if (conf == null) {
337      throw new IllegalArgumentException("parameter conf must be set");
338    }
339
340    final String rootDir = conf.get(HConstants.HBASE_DIR);
341    if (rootDir == null || rootDir.isEmpty()) {
342      throw new IllegalArgumentException(HConstants.HBASE_DIR + " key not found in conf.");
343    }
344
345    final StringBuilder startPathSB = new StringBuilder(rootDir);
346    if (!rootDir.endsWith("/")) {
347      startPathSB.append('/');
348    }
349    startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
350    if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/")) {
351      startPathSB.append('/');
352    }
353    final String startPath = startPathSB.toString();
354
355    String fullPath;
356    try {
357      fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString();
358    } catch (IllegalArgumentException e) {
359      LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage());
360      return null;
361    }
362
363    if (!fullPath.startsWith(startPath)) {
364      return null;
365    }
366
367    final String serverNameAndFile = fullPath.substring(startPath.length());
368
369    if (serverNameAndFile.indexOf('/') < "a,0,0".length()) {
370      // Either it's a file (not a directory) or it's not a ServerName format
371      return null;
372    }
373
374    Path p = new Path(path);
375    return getServerNameFromWALDirectoryName(p);
376  }
377
378  /**
379   * This function returns region server name from a log file name which is in one of the following
380   * formats:
381   * <ul>
382   * <li>hdfs://&lt;name node&gt;/hbase/.logs/&lt;server name&gt;-splitting/...</li>
383   * <li>hdfs://&lt;name node&gt;/hbase/.logs/&lt;server name&gt;/...</li>
384   * </ul>
385   * @return null if the passed in logFile isn't a valid WAL file path
386   */
387  public static ServerName getServerNameFromWALDirectoryName(Path logFile) {
388    String logDirName = logFile.getParent().getName();
389    // We were passed the directory and not a file in it.
390    if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) {
391      logDirName = logFile.getName();
392    }
393    ServerName serverName = null;
394    if (logDirName.endsWith(SPLITTING_EXT)) {
395      logDirName = logDirName.substring(0, logDirName.length() - SPLITTING_EXT.length());
396    }
397    try {
398      serverName = ServerName.parseServerName(logDirName);
399    } catch (IllegalArgumentException | IllegalStateException ex) {
400      serverName = null;
401      LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
402    }
403    if (serverName != null && serverName.getStartcode() < 0) {
404      LOG.warn("Invalid log file path=" + logFile);
405      serverName = null;
406    }
407    return serverName;
408  }
409
410  public static boolean isMetaFile(Path p) {
411    return isMetaFile(p.getName());
412  }
413
414  /**
415   * @return True if String ends in {@link #META_WAL_PROVIDER_ID}
416   */
417  public static boolean isMetaFile(String p) {
418    return p != null && p.endsWith(META_WAL_PROVIDER_ID);
419  }
420
421  public static boolean isArchivedLogFile(Path p) {
422    String oldLog = Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME + Path.SEPARATOR;
423    return p.toString().contains(oldLog);
424  }
425
426  /**
427   * Get the archived WAL file path
428   * @param path - active WAL file path
429   * @param conf - configuration
430   * @return archived path if exists, path - otherwise
431   * @throws IOException exception
432   */
433  public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
434    Path rootDir = CommonFSUtils.getWALRootDir(conf);
435    Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
436    if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
437      ServerName serverName = getServerNameFromWALDirectoryName(path);
438      if (serverName == null) {
439        LOG.error("Couldn't locate log: " + path);
440        return path;
441      }
442      oldLogDir = new Path(oldLogDir, serverName.getServerName());
443    }
444    Path archivedLogLocation = new Path(oldLogDir, path.getName());
445    final FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
446
447    if (fs.exists(archivedLogLocation)) {
448      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
449      return archivedLogLocation;
450    } else {
451      LOG.error("Couldn't locate log: " + path);
452      return path;
453    }
454  }
455
456  /**
457   * Opens WAL reader with retries and additional exception handling
458   * @param path path to WAL file
459   * @param conf configuration
460   * @return WAL Reader instance
461   */
462  public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf)
463      throws IOException {
464    long retryInterval = 2000; // 2 sec
465    int maxAttempts = 30;
466    int attempt = 0;
467    Exception ee = null;
468    org.apache.hadoop.hbase.wal.WAL.Reader reader = null;
469    while (reader == null && attempt++ < maxAttempts) {
470      try {
471        // Detect if this is a new file, if so get a new reader else
472        // reset the current reader so that we see the new data
473        reader = WALFactory.createReader(path.getFileSystem(conf), path, conf);
474        return reader;
475      } catch (FileNotFoundException fnfe) {
476        // If the log was archived, continue reading from there
477        Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf);
478        if (!Objects.equals(path, archivedLog)) {
479          return openReader(archivedLog, conf);
480        } else {
481          throw fnfe;
482        }
483      } catch (LeaseNotRecoveredException lnre) {
484        // HBASE-15019 the WAL was not closed due to some hiccup.
485        LOG.warn("Try to recover the WAL lease " + path, lnre);
486        recoverLease(conf, path);
487        reader = null;
488        ee = lnre;
489      } catch (NullPointerException npe) {
490        // Workaround for race condition in HDFS-4380
491        // which throws a NPE if we open a file before any data node has the most recent block
492        // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
493        LOG.warn("Got NPE opening reader, will retry.");
494        reader = null;
495        ee = npe;
496      }
497      if (reader == null) {
498        // sleep before next attempt
499        try {
500          Thread.sleep(retryInterval);
501        } catch (InterruptedException e) {
502        }
503      }
504    }
505    throw new IOException("Could not open reader", ee);
506  }
507
508  // For HBASE-15019
509  private static void recoverLease(final Configuration conf, final Path path) {
510    try {
511      final FileSystem dfs = CommonFSUtils.getCurrentFileSystem(conf);
512      RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
513        @Override
514        public boolean progress() {
515          LOG.debug("Still trying to recover WAL lease: " + path);
516          return true;
517        }
518      });
519    } catch (IOException e) {
520      LOG.warn("unable to recover lease for WAL: " + path, e);
521    }
522  }
523
524  @Override
525  public void addWALActionsListener(WALActionsListener listener) {
526    listeners.add(listener);
527  }
528
529  private static String getWALNameGroupFromWALName(String name, int group) {
530    Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name);
531    if (matcher.matches()) {
532      return matcher.group(group);
533    } else {
534      throw new IllegalArgumentException(name + " is not a valid wal file name");
535    }
536  }
537  /**
538   * Get prefix of the log from its name, assuming WAL name in format of
539   * log_prefix.filenumber.log_suffix
540   * @param name Name of the WAL to parse
541   * @return prefix of the log
542   * @throws IllegalArgumentException if the name passed in is not a valid wal file name
543   * @see AbstractFSWAL#getCurrentFileName()
544   */
545  public static String getWALPrefixFromWALName(String name) {
546    return getWALNameGroupFromWALName(name, 1);
547  }
548
549  public static long getWALStartTimeFromWALName(String name) {
550    return Long.parseLong(getWALNameGroupFromWALName(name, 2));
551  }
552}