001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.List;
025import java.util.Objects;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.locks.ReadWriteLock;
028import java.util.concurrent.locks.ReentrantReadWriteLock;
029import java.util.regex.Matcher;
030import java.util.regex.Pattern;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.apache.yetus.audience.InterfaceStability;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
044import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
045import org.apache.hadoop.hbase.util.CancelableProgressable;
046import org.apache.hadoop.hbase.util.FSUtils;
047import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
049import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
050
051/**
052 * Base class of a WAL Provider that returns a single thread safe WAL that writes to Hadoop FS. By
053 * default, this implementation picks a directory in Hadoop FS based on a combination of
054 * <ul>
055 * <li>the HBase root directory
056 * <li>HConstants.HREGION_LOGDIR_NAME
057 * <li>the given factory's factoryId (usually identifying the regionserver by host:port)
058 * </ul>
059 * It also uses the providerId to differentiate among files.
060 */
061@InterfaceAudience.Private
062@InterfaceStability.Evolving
063public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implements WALProvider {
064
065  private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWALProvider.class);
066
067  /** Separate old log into different dir by regionserver name **/
068  public static final String SEPARATE_OLDLOGDIR = "hbase.separate.oldlogdir.by.regionserver";
069  public static final boolean DEFAULT_SEPARATE_OLDLOGDIR = false;
070
071  // Only public so classes back in regionserver.wal can access
072  public interface Reader extends WAL.Reader {
073    /**
074     * @param fs File system.
075     * @param path Path.
076     * @param c Configuration.
077     * @param s Input stream that may have been pre-opened by the caller; may be null.
078     */
079    void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException;
080  }
081
082  protected volatile T wal;
083  protected WALFactory factory;
084  protected Configuration conf;
085  protected List<WALActionsListener> listeners = new ArrayList<>();
086  protected String providerId;
087  protected AtomicBoolean initialized = new AtomicBoolean(false);
088  // for default wal provider, logPrefix won't change
089  protected String logPrefix;
090
091  /**
092   * We use walCreateLock to prevent wal recreation in different threads, and also prevent getWALs
093   * missing the newly created WAL, see HBASE-21503 for more details.
094   */
095  private final ReadWriteLock walCreateLock = new ReentrantReadWriteLock();
096
097  /**
098   * @param factory factory that made us, identity used for FS layout. may not be null
099   * @param conf may not be null
100   * @param providerId differentiate between providers from one factory, used for FS layout. may be
101   *          null
102   */
103  @Override
104  public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
105    if (!initialized.compareAndSet(false, true)) {
106      throw new IllegalStateException("WALProvider.init should only be called once.");
107    }
108    this.factory = factory;
109    this.conf = conf;
110    this.providerId = providerId;
111    // get log prefix
112    StringBuilder sb = new StringBuilder().append(factory.factoryId);
113    if (providerId != null) {
114      if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
115        sb.append(providerId);
116      } else {
117        sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
118      }
119    }
120    logPrefix = sb.toString();
121    doInit(conf);
122  }
123
124  @Override
125  public List<WAL> getWALs() {
126    if (wal != null) {
127      return Lists.newArrayList(wal);
128    }
129    walCreateLock.readLock().lock();
130    try {
131      if (wal == null) {
132        return Collections.emptyList();
133      } else {
134        return Lists.newArrayList(wal);
135      }
136    } finally {
137      walCreateLock.readLock().unlock();
138    }
139  }
140
141  @Override
142  public T getWAL(RegionInfo region) throws IOException {
143    T walCopy = wal;
144    if (walCopy != null) {
145      return walCopy;
146    }
147    walCreateLock.writeLock().lock();
148    try {
149      walCopy = wal;
150      if (walCopy != null) {
151        return walCopy;
152      }
153      walCopy = createWAL();
154      boolean succ = false;
155      try {
156        walCopy.init();
157        succ = true;
158      } finally {
159        if (!succ) {
160          walCopy.close();
161        }
162      }
163      wal = walCopy;
164      return walCopy;
165    } finally {
166      walCreateLock.writeLock().unlock();
167    }
168  }
169
170  protected abstract T createWAL() throws IOException;
171
172  protected abstract void doInit(Configuration conf) throws IOException;
173
174  @Override
175  public void shutdown() throws IOException {
176    T log = this.wal;
177    if (log != null) {
178      log.shutdown();
179    }
180  }
181
182  @Override
183  public void close() throws IOException {
184    T log = this.wal;
185    if (log != null) {
186      log.close();
187    }
188  }
189
190  /**
191   * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the
192   * number of files (rolled and active). if either of them aren't, count 0 for that provider.
193   */
194  @Override
195  public long getNumLogFiles() {
196    T log = this.wal;
197    return log == null ? 0 : log.getNumLogFiles();
198  }
199
200  /**
201   * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the
202   * size of files (only rolled). if either of them aren't, count 0 for that provider.
203   */
204  @Override
205  public long getLogFileSize() {
206    T log = this.wal;
207    return log == null ? 0 : log.getLogFileSize();
208  }
209
210  /**
211   * returns the number of rolled WAL files.
212   */
213  @VisibleForTesting
214  public static int getNumRolledLogFiles(WAL wal) {
215    return ((AbstractFSWAL<?>) wal).getNumRolledLogFiles();
216  }
217
218  /**
219   * returns the size of rolled WAL files.
220   */
221  @VisibleForTesting
222  public static long getLogFileSize(WAL wal) {
223    return ((AbstractFSWAL<?>) wal).getLogFileSize();
224  }
225
226  /**
227   * return the current filename from the current wal.
228   */
229  @VisibleForTesting
230  public static Path getCurrentFileName(final WAL wal) {
231    return ((AbstractFSWAL<?>) wal).getCurrentFileName();
232  }
233
234  /**
235   * request a log roll, but don't actually do it.
236   */
237  @VisibleForTesting
238  static void requestLogRoll(final WAL wal) {
239    ((AbstractFSWAL<?>) wal).requestLogRoll();
240  }
241
242  // should be package private; more visible for use in AbstractFSWAL
243  public static final String WAL_FILE_NAME_DELIMITER = ".";
244  /** The hbase:meta region's WAL filename extension */
245  @VisibleForTesting
246  public static final String META_WAL_PROVIDER_ID = ".meta";
247  static final String DEFAULT_PROVIDER_ID = "default";
248
249  // Implementation details that currently leak in tests or elsewhere follow
250  /** File Extension used while splitting an WAL into regions (HBASE-2312) */
251  public static final String SPLITTING_EXT = "-splitting";
252
253  /**
254   * It returns the file create timestamp from the file name. For name format see
255   * {@link #validateWALFilename(String)} public until remaining tests move to o.a.h.h.wal
256   * @param wal must not be null
257   * @return the file number that is part of the WAL file name
258   */
259  @VisibleForTesting
260  public static long extractFileNumFromWAL(final WAL wal) {
261    final Path walName = ((AbstractFSWAL<?>) wal).getCurrentFileName();
262    if (walName == null) {
263      throw new IllegalArgumentException("The WAL path couldn't be null");
264    }
265    Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(walName.getName());
266    if (matcher.matches()) {
267      return Long.parseLong(matcher.group(2));
268    } else {
269      throw new IllegalArgumentException(walName.getName() + " is not a valid wal file name");
270    }
271  }
272
273  /**
274   * Pattern used to validate a WAL file name see {@link #validateWALFilename(String)} for
275   * description.
276   */
277  private static final Pattern WAL_FILE_NAME_PATTERN =
278    Pattern.compile("(.+)\\.(\\d+)(\\.[0-9A-Za-z]+)?");
279
280  /**
281   * A WAL file name is of the format: &lt;wal-name&gt;{@link #WAL_FILE_NAME_DELIMITER}
282   * &lt;file-creation-timestamp&gt;[.&lt;suffix&gt;]. provider-name is usually made up of a
283   * server-name and a provider-id
284   * @param filename name of the file to validate
285   * @return <tt>true</tt> if the filename matches an WAL, <tt>false</tt> otherwise
286   */
287  public static boolean validateWALFilename(String filename) {
288    return WAL_FILE_NAME_PATTERN.matcher(filename).matches();
289  }
290
291  /**
292   * Construct the directory name for all WALs on a given server. Dir names currently look like this
293   * for WALs: <code>hbase//WALs/kalashnikov.att.net,61634,1486865297088</code>.
294   * @param serverName Server name formatted as described in {@link ServerName}
295   * @return the relative WAL directory name, e.g. <code>.logs/1.example.org,60030,12345</code> if
296   *         <code>serverName</code> passed is <code>1.example.org,60030,12345</code>
297   */
298  public static String getWALDirectoryName(final String serverName) {
299    StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
300    dirName.append("/");
301    dirName.append(serverName);
302    return dirName.toString();
303  }
304
305  /**
306   * Construct the directory name for all old WALs on a given server. The default old WALs dir looks
307   * like: <code>hbase/oldWALs</code>. If you config hbase.separate.oldlogdir.by.regionserver to
308   * true, it looks like <code>hbase//oldWALs/kalashnikov.att.net,61634,1486865297088</code>.
309   * @param conf
310   * @param serverName Server name formatted as described in {@link ServerName}
311   * @return the relative WAL directory name
312   */
313  public static String getWALArchiveDirectoryName(Configuration conf, final String serverName) {
314    StringBuilder dirName = new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME);
315    if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
316      dirName.append(Path.SEPARATOR);
317      dirName.append(serverName);
318    }
319    return dirName.toString();
320  }
321
322  /**
323   * Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts,
324   * this method ignores the format of the logfile component. Current format: [base directory for
325   * hbase]/hbase/.logs/ServerName/logfile or [base directory for
326   * hbase]/hbase/.logs/ServerName-splitting/logfile Expected to work for individual log files and
327   * server-specific directories.
328   * @return null if it's not a log file. Returns the ServerName of the region server that created
329   *         this log file otherwise.
330   */
331  public static ServerName getServerNameFromWALDirectoryName(Configuration conf, String path)
332      throws IOException {
333    if (path == null || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) {
334      return null;
335    }
336
337    if (conf == null) {
338      throw new IllegalArgumentException("parameter conf must be set");
339    }
340
341    final String rootDir = conf.get(HConstants.HBASE_DIR);
342    if (rootDir == null || rootDir.isEmpty()) {
343      throw new IllegalArgumentException(HConstants.HBASE_DIR + " key not found in conf.");
344    }
345
346    final StringBuilder startPathSB = new StringBuilder(rootDir);
347    if (!rootDir.endsWith("/")) {
348      startPathSB.append('/');
349    }
350    startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
351    if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/")) {
352      startPathSB.append('/');
353    }
354    final String startPath = startPathSB.toString();
355
356    String fullPath;
357    try {
358      fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString();
359    } catch (IllegalArgumentException e) {
360      LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage());
361      return null;
362    }
363
364    if (!fullPath.startsWith(startPath)) {
365      return null;
366    }
367
368    final String serverNameAndFile = fullPath.substring(startPath.length());
369
370    if (serverNameAndFile.indexOf('/') < "a,0,0".length()) {
371      // Either it's a file (not a directory) or it's not a ServerName format
372      return null;
373    }
374
375    Path p = new Path(path);
376    return getServerNameFromWALDirectoryName(p);
377  }
378
379  /**
380   * This function returns region server name from a log file name which is in one of the following
381   * formats:
382   * <ul>
383   * <li>hdfs://&lt;name node&gt;/hbase/.logs/&lt;server name&gt;-splitting/...</li>
384   * <li>hdfs://&lt;name node&gt;/hbase/.logs/&lt;server name&gt;/...</li>
385   * </ul>
386   * @return null if the passed in logFile isn't a valid WAL file path
387   */
388  public static ServerName getServerNameFromWALDirectoryName(Path logFile) {
389    String logDirName = logFile.getParent().getName();
390    // We were passed the directory and not a file in it.
391    if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) {
392      logDirName = logFile.getName();
393    }
394    ServerName serverName = null;
395    if (logDirName.endsWith(SPLITTING_EXT)) {
396      logDirName = logDirName.substring(0, logDirName.length() - SPLITTING_EXT.length());
397    }
398    try {
399      serverName = ServerName.parseServerName(logDirName);
400    } catch (IllegalArgumentException | IllegalStateException ex) {
401      serverName = null;
402      LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
403    }
404    if (serverName != null && serverName.getStartcode() < 0) {
405      LOG.warn("Invalid log file path=" + logFile);
406      serverName = null;
407    }
408    return serverName;
409  }
410
411  public static boolean isMetaFile(Path p) {
412    return isMetaFile(p.getName());
413  }
414
415  public static boolean isMetaFile(String p) {
416    if (p != null && p.endsWith(META_WAL_PROVIDER_ID)) {
417      return true;
418    }
419    return false;
420  }
421
422  public static boolean isArchivedLogFile(Path p) {
423    String oldLog = Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME + Path.SEPARATOR;
424    return p.toString().contains(oldLog);
425  }
426
427  /**
428   * Get the archived WAL file path
429   * @param path - active WAL file path
430   * @param conf - configuration
431   * @return archived path if exists, path - otherwise
432   * @throws IOException exception
433   */
434  public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
435    Path rootDir = FSUtils.getWALRootDir(conf);
436    Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
437    if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
438      ServerName serverName = getServerNameFromWALDirectoryName(path);
439      if (serverName == null) {
440        LOG.error("Couldn't locate log: " + path);
441        return path;
442      }
443      oldLogDir = new Path(oldLogDir, serverName.getServerName());
444    }
445    Path archivedLogLocation = new Path(oldLogDir, path.getName());
446    final FileSystem fs = FSUtils.getWALFileSystem(conf);
447
448    if (fs.exists(archivedLogLocation)) {
449      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
450      return archivedLogLocation;
451    } else {
452      LOG.error("Couldn't locate log: " + path);
453      return path;
454    }
455  }
456
457  /**
458   * Opens WAL reader with retries and additional exception handling
459   * @param path path to WAL file
460   * @param conf configuration
461   * @return WAL Reader instance
462   * @throws IOException
463   */
464  public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf)
465      throws IOException
466
467  {
468    long retryInterval = 2000; // 2 sec
469    int maxAttempts = 30;
470    int attempt = 0;
471    Exception ee = null;
472    org.apache.hadoop.hbase.wal.WAL.Reader reader = null;
473    while (reader == null && attempt++ < maxAttempts) {
474      try {
475        // Detect if this is a new file, if so get a new reader else
476        // reset the current reader so that we see the new data
477        reader = WALFactory.createReader(path.getFileSystem(conf), path, conf);
478        return reader;
479      } catch (FileNotFoundException fnfe) {
480        // If the log was archived, continue reading from there
481        Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf);
482        if (!Objects.equals(path, archivedLog)) {
483          return openReader(archivedLog, conf);
484        } else {
485          throw fnfe;
486        }
487      } catch (LeaseNotRecoveredException lnre) {
488        // HBASE-15019 the WAL was not closed due to some hiccup.
489        LOG.warn("Try to recover the WAL lease " + path, lnre);
490        recoverLease(conf, path);
491        reader = null;
492        ee = lnre;
493      } catch (NullPointerException npe) {
494        // Workaround for race condition in HDFS-4380
495        // which throws a NPE if we open a file before any data node has the most recent block
496        // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
497        LOG.warn("Got NPE opening reader, will retry.");
498        reader = null;
499        ee = npe;
500      }
501      if (reader == null) {
502        // sleep before next attempt
503        try {
504          Thread.sleep(retryInterval);
505        } catch (InterruptedException e) {
506        }
507      }
508    }
509    throw new IOException("Could not open reader", ee);
510  }
511
512  // For HBASE-15019
513  private static void recoverLease(final Configuration conf, final Path path) {
514    try {
515      final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
516      FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
517      fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
518        @Override
519        public boolean progress() {
520          LOG.debug("Still trying to recover WAL lease: " + path);
521          return true;
522        }
523      });
524    } catch (IOException e) {
525      LOG.warn("unable to recover lease for WAL: " + path, e);
526    }
527  }
528
529  @Override
530  public void addWALActionsListener(WALActionsListener listener) {
531    listeners.add(listener);
532  }
533
534  private static String getWALNameGroupFromWALName(String name, int group) {
535    Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name);
536    if (matcher.matches()) {
537      return matcher.group(group);
538    } else {
539      throw new IllegalArgumentException(name + " is not a valid wal file name");
540    }
541  }
542  /**
543   * Get prefix of the log from its name, assuming WAL name in format of
544   * log_prefix.filenumber.log_suffix
545   * @param name Name of the WAL to parse
546   * @return prefix of the log
547   * @throws IllegalArgumentException if the name passed in is not a valid wal file name
548   * @see AbstractFSWAL#getCurrentFileName()
549   */
550  public static String getWALPrefixFromWALName(String name) {
551    return getWALNameGroupFromWALName(name, 1);
552  }
553
554  public static long getWALStartTimeFromWALName(String name) {
555    return Long.parseLong(getWALNameGroupFromWALName(name, 2));
556  }
557}