001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.List;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.locks.ReadWriteLock;
028import java.util.concurrent.locks.ReentrantReadWriteLock;
029import java.util.regex.Pattern;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Abortable;
035import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
040import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
041import org.apache.hadoop.hbase.util.CancelableProgressable;
042import org.apache.hadoop.hbase.util.CommonFSUtils;
043import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
044import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.apache.yetus.audience.InterfaceStability;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
051
052/**
053 * Base class of a WAL Provider that returns a single thread safe WAL that writes to Hadoop FS. By
054 * default, this implementation picks a directory in Hadoop FS based on a combination of
055 * <ul>
056 * <li>the HBase root directory
057 * <li>HConstants.HREGION_LOGDIR_NAME
058 * <li>the given factory's factoryId (usually identifying the regionserver by host:port)
059 * </ul>
060 * It also uses the providerId to differentiate among files.
061 */
062@InterfaceAudience.Private
063@InterfaceStability.Evolving
064public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implements WALProvider {
065
066  private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWALProvider.class);
067
068  /** Separate old log into different dir by regionserver name **/
069  public static final String SEPARATE_OLDLOGDIR = "hbase.separate.oldlogdir.by.regionserver";
070  public static final boolean DEFAULT_SEPARATE_OLDLOGDIR = false;
071
072  // Only public so classes back in regionserver.wal can access
073  public interface Reader extends WAL.Reader {
074    /**
075     * @param fs   File system.
076     * @param path Path.
077     * @param c    Configuration.
078     * @param s    Input stream that may have been pre-opened by the caller; may be null.
079     */
080    void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException;
081  }
082
083  protected volatile T wal;
084  protected WALFactory factory;
085  protected Configuration conf;
086  protected List<WALActionsListener> listeners = new ArrayList<>();
087  protected String providerId;
088  protected AtomicBoolean initialized = new AtomicBoolean(false);
089  // for default wal provider, logPrefix won't change
090  protected String logPrefix;
091  protected Abortable abortable;
092
093  /**
094   * We use walCreateLock to prevent wal recreation in different threads, and also prevent getWALs
095   * missing the newly created WAL, see HBASE-21503 for more details.
096   */
097  private final ReadWriteLock walCreateLock = new ReentrantReadWriteLock();
098
099  /**
100   * @param factory    factory that made us, identity used for FS layout. may not be null
101   * @param conf       may not be null
102   * @param providerId differentiate between providers from one factory, used for FS layout. may be
103   *                   null
104   */
105  @Override
106  public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
107    throws IOException {
108    if (!initialized.compareAndSet(false, true)) {
109      throw new IllegalStateException("WALProvider.init should only be called once.");
110    }
111    this.factory = factory;
112    this.conf = conf;
113    this.providerId = providerId;
114    // get log prefix
115    StringBuilder sb = new StringBuilder().append(factory.factoryId);
116    if (providerId != null) {
117      if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
118        sb.append(providerId);
119      } else {
120        sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
121      }
122    }
123    logPrefix = sb.toString();
124    this.abortable = abortable;
125    doInit(conf);
126  }
127
128  @Override
129  public List<WAL> getWALs() {
130    if (wal != null) {
131      return Lists.newArrayList(wal);
132    }
133    walCreateLock.readLock().lock();
134    try {
135      if (wal == null) {
136        return Collections.emptyList();
137      } else {
138        return Lists.newArrayList(wal);
139      }
140    } finally {
141      walCreateLock.readLock().unlock();
142    }
143  }
144
145  @Override
146  public T getWAL(RegionInfo region) throws IOException {
147    T walCopy = wal;
148    if (walCopy != null) {
149      return walCopy;
150    }
151    walCreateLock.writeLock().lock();
152    try {
153      walCopy = wal;
154      if (walCopy != null) {
155        return walCopy;
156      }
157      walCopy = createWAL();
158      boolean succ = false;
159      try {
160        walCopy.init();
161        succ = true;
162      } finally {
163        if (!succ) {
164          try {
165            walCopy.close();
166          } catch (Throwable t) {
167            throw new FailedCloseWALAfterInitializedErrorException(
168              "Failed close after init wal failed.", t);
169          }
170        }
171      }
172      wal = walCopy;
173      return walCopy;
174    } finally {
175      walCreateLock.writeLock().unlock();
176    }
177  }
178
179  protected abstract T createWAL() throws IOException;
180
181  protected abstract void doInit(Configuration conf) throws IOException;
182
183  @Override
184  public void shutdown() throws IOException {
185    T log = this.wal;
186    if (log != null) {
187      log.shutdown();
188    }
189  }
190
191  @Override
192  public void close() throws IOException {
193    T log = this.wal;
194    if (log != null) {
195      log.close();
196    }
197  }
198
199  /**
200   * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the
201   * number of files (rolled and active). if either of them aren't, count 0 for that provider.
202   */
203  @Override
204  public long getNumLogFiles() {
205    T log = this.wal;
206    return log == null ? 0 : log.getNumLogFiles();
207  }
208
209  /**
210   * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the
211   * size of files (only rolled). if either of them aren't, count 0 for that provider.
212   */
213  @Override
214  public long getLogFileSize() {
215    T log = this.wal;
216    return log == null ? 0 : log.getLogFileSize();
217  }
218
219  /**
220   * returns the number of rolled WAL files.
221   */
222  public static int getNumRolledLogFiles(WAL wal) {
223    return ((AbstractFSWAL<?>) wal).getNumRolledLogFiles();
224  }
225
226  /**
227   * returns the size of rolled WAL files.
228   */
229  public static long getLogFileSize(WAL wal) {
230    return ((AbstractFSWAL<?>) wal).getLogFileSize();
231  }
232
233  /**
234   * return the current filename from the current wal.
235   */
236  public static Path getCurrentFileName(final WAL wal) {
237    return ((AbstractFSWAL<?>) wal).getCurrentFileName();
238  }
239
240  /**
241   * request a log roll, but don't actually do it.
242   */
243  static void requestLogRoll(final WAL wal) {
244    ((AbstractFSWAL<?>) wal).requestLogRoll();
245  }
246
247  // should be package private; more visible for use in AbstractFSWAL
248  public static final String WAL_FILE_NAME_DELIMITER = ".";
249  /** The hbase:meta region's WAL filename extension */
250  public static final String META_WAL_PROVIDER_ID = ".meta";
251  static final String DEFAULT_PROVIDER_ID = "default";
252
253  // Implementation details that currently leak in tests or elsewhere follow
254  /** File Extension used while splitting an WAL into regions (HBASE-2312) */
255  public static final String SPLITTING_EXT = "-splitting";
256
257  /**
258   * It returns the file create timestamp from the file name. For name format see
259   * {@link #validateWALFilename(String)} public until remaining tests move to o.a.h.h.wal
260   * @param wal must not be null
261   * @return the file number that is part of the WAL file name
262   */
263  public static long extractFileNumFromWAL(final WAL wal) {
264    final Path walName = ((AbstractFSWAL<?>) wal).getCurrentFileName();
265    if (walName == null) {
266      throw new IllegalArgumentException("The WAL path couldn't be null");
267    }
268    final String[] walPathStrs = walName.toString().split("\\" + WAL_FILE_NAME_DELIMITER);
269    return Long.parseLong(walPathStrs[walPathStrs.length - (isMetaFile(walName) ? 2 : 1)]);
270  }
271
272  /**
273   * Pattern used to validate a WAL file name see {@link #validateWALFilename(String)} for
274   * description.
275   */
276  private static final Pattern pattern =
277    Pattern.compile(".*\\.\\d*(" + META_WAL_PROVIDER_ID + ")*");
278
279  /**
280   * A WAL file name is of the format: &lt;wal-name&gt;{@link #WAL_FILE_NAME_DELIMITER}
281   * &lt;file-creation-timestamp&gt;[.meta]. provider-name is usually made up of a server-name and a
282   * provider-id
283   * @param filename name of the file to validate
284   * @return <tt>true</tt> if the filename matches an WAL, <tt>false</tt> otherwise
285   */
286  public static boolean validateWALFilename(String filename) {
287    return pattern.matcher(filename).matches();
288  }
289
290  /**
291   * Construct the directory name for all WALs on a given server. Dir names currently look like this
292   * for WALs: <code>hbase//WALs/kalashnikov.att.net,61634,1486865297088</code>.
293   * @param serverName Server name formatted as described in {@link ServerName}
294   * @return the relative WAL directory name, e.g. <code>.logs/1.example.org,60030,12345</code> if
295   *         <code>serverName</code> passed is <code>1.example.org,60030,12345</code>
296   */
297  public static String getWALDirectoryName(final String serverName) {
298    StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
299    dirName.append("/");
300    dirName.append(serverName);
301    return dirName.toString();
302  }
303
304  /**
305   * Construct the directory name for all old WALs on a given server. The default old WALs dir looks
306   * like: <code>hbase/oldWALs</code>. If you config hbase.separate.oldlogdir.by.regionserver to
307   * true, it looks like <code>hbase//oldWALs/kalashnikov.att.net,61634,1486865297088</code>.
308   * @param serverName Server name formatted as described in {@link ServerName}
309   * @return the relative WAL directory name
310   */
311  public static String getWALArchiveDirectoryName(Configuration conf, final String serverName) {
312    StringBuilder dirName = new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME);
313    if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
314      dirName.append(Path.SEPARATOR);
315      dirName.append(serverName);
316    }
317    return dirName.toString();
318  }
319
320  /**
321   * Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts,
322   * this method ignores the format of the logfile component. Current format: [base directory for
323   * hbase]/hbase/.logs/ServerName/logfile or [base directory for
324   * hbase]/hbase/.logs/ServerName-splitting/logfile Expected to work for individual log files and
325   * server-specific directories.
326   * @return null if it's not a log file. Returns the ServerName of the region server that created
327   *         this log file otherwise.
328   */
329  public static ServerName getServerNameFromWALDirectoryName(Configuration conf, String path)
330    throws IOException {
331    if (path == null || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) {
332      return null;
333    }
334
335    if (conf == null) {
336      throw new IllegalArgumentException("parameter conf must be set");
337    }
338
339    final String rootDir = conf.get(HConstants.HBASE_DIR);
340    if (rootDir == null || rootDir.isEmpty()) {
341      throw new IllegalArgumentException(HConstants.HBASE_DIR + " key not found in conf.");
342    }
343
344    final StringBuilder startPathSB = new StringBuilder(rootDir);
345    if (!rootDir.endsWith("/")) {
346      startPathSB.append('/');
347    }
348    startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
349    if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/")) {
350      startPathSB.append('/');
351    }
352    final String startPath = startPathSB.toString();
353
354    String fullPath;
355    try {
356      fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString();
357    } catch (IllegalArgumentException e) {
358      LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage());
359      return null;
360    }
361
362    if (!fullPath.startsWith(startPath)) {
363      return null;
364    }
365
366    final String serverNameAndFile = fullPath.substring(startPath.length());
367
368    if (serverNameAndFile.indexOf('/') < "a,0,0".length()) {
369      // Either it's a file (not a directory) or it's not a ServerName format
370      return null;
371    }
372
373    Path p = new Path(path);
374    return getServerNameFromWALDirectoryName(p);
375  }
376
377  /**
378   * This function returns region server name from a log file name which is in one of the following
379   * formats:
380   * <ul>
381   * <li>hdfs://&lt;name node&gt;/hbase/.logs/&lt;server name&gt;-splitting/...</li>
382   * <li>hdfs://&lt;name node&gt;/hbase/.logs/&lt;server name&gt;/...</li>
383   * </ul>
384   * @return null if the passed in logFile isn't a valid WAL file path
385   */
386  public static ServerName getServerNameFromWALDirectoryName(Path logFile) {
387    String logDirName = logFile.getParent().getName();
388    // We were passed the directory and not a file in it.
389    if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) {
390      logDirName = logFile.getName();
391    }
392    ServerName serverName = null;
393    if (logDirName.endsWith(SPLITTING_EXT)) {
394      logDirName = logDirName.substring(0, logDirName.length() - SPLITTING_EXT.length());
395    }
396    try {
397      serverName = ServerName.parseServerName(logDirName);
398    } catch (IllegalArgumentException | IllegalStateException ex) {
399      serverName = null;
400      LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
401    }
402    if (serverName != null && serverName.getStartcode() < 0) {
403      LOG.warn("Invalid log file path=" + logFile);
404      serverName = null;
405    }
406    return serverName;
407  }
408
409  public static boolean isMetaFile(Path p) {
410    return isMetaFile(p.getName());
411  }
412
413  /** Returns True if String ends in {@link #META_WAL_PROVIDER_ID} */
414  public static boolean isMetaFile(String p) {
415    return p != null && p.endsWith(META_WAL_PROVIDER_ID);
416  }
417
418  /**
419   * Comparator used to compare WAL files together based on their start time. Just compares start
420   * times and nothing else.
421   */
422  public static class WALStartTimeComparator implements Comparator<Path> {
423    @Override
424    public int compare(Path o1, Path o2) {
425      return Long.compare(getTS(o1), getTS(o2));
426    }
427
428    /**
429     * Split a path to get the start time For example: 10.20.20.171%3A60020.1277499063250 Could also
430     * be a meta WAL which adds a '.meta' suffix or a synchronous replication WAL which adds a
431     * '.syncrep' suffix. Check.
432     * @param p path to split
433     * @return start time
434     */
435    public static long getTS(Path p) {
436      return WAL.getTimestamp(p.getName());
437    }
438  }
439
440  public static boolean isArchivedLogFile(Path p) {
441    String oldLog = Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME + Path.SEPARATOR;
442    return p.toString().contains(oldLog);
443  }
444
445  /**
446   * Find the archived WAL file path if it is not able to locate in WALs dir.
447   * @param path - active WAL file path
448   * @param conf - configuration
449   * @return archived path if exists, null - otherwise
450   * @throws IOException exception
451   */
452  public static Path findArchivedLog(Path path, Configuration conf) throws IOException {
453    // If the path contains oldWALs keyword then exit early.
454    if (path.toString().contains(HConstants.HREGION_OLDLOGDIR_NAME)) {
455      return null;
456    }
457    Path walRootDir = CommonFSUtils.getWALRootDir(conf);
458    FileSystem fs = path.getFileSystem(conf);
459    // Try finding the log in old dir
460    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
461    Path archivedLogLocation = new Path(oldLogDir, path.getName());
462    if (fs.exists(archivedLogLocation)) {
463      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
464      return archivedLogLocation;
465    }
466
467    ServerName serverName = getServerNameFromWALDirectoryName(path);
468    // Try finding the log in separate old log dir
469    oldLogDir = new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
470      .append(Path.SEPARATOR).append(serverName.getServerName()).toString());
471    archivedLogLocation = new Path(oldLogDir, path.getName());
472    if (fs.exists(archivedLogLocation)) {
473      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
474      return archivedLogLocation;
475    }
476    LOG.error("Couldn't locate log: " + path);
477    return null;
478  }
479
480  /**
481   * Opens WAL reader with retries and additional exception handling
482   * @param path path to WAL file
483   * @param conf configuration
484   * @return WAL Reader instance
485   */
486  public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf)
487    throws IOException {
488    long retryInterval = 2000; // 2 sec
489    int maxAttempts = 30;
490    int attempt = 0;
491    Exception ee = null;
492    org.apache.hadoop.hbase.wal.WAL.Reader reader = null;
493    while (reader == null && attempt++ < maxAttempts) {
494      try {
495        // Detect if this is a new file, if so get a new reader else
496        // reset the current reader so that we see the new data
497        reader = WALFactory.createReader(path.getFileSystem(conf), path, conf);
498        return reader;
499      } catch (FileNotFoundException fnfe) {
500        // If the log was archived, continue reading from there
501        Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
502        // archivedLog can be null if unable to locate in archiveDir.
503        if (archivedLog != null) {
504          return openReader(archivedLog, conf);
505        } else {
506          throw fnfe;
507        }
508      } catch (LeaseNotRecoveredException lnre) {
509        // HBASE-15019 the WAL was not closed due to some hiccup.
510        LOG.warn("Try to recover the WAL lease " + path, lnre);
511        recoverLease(conf, path);
512        reader = null;
513        ee = lnre;
514      } catch (NullPointerException npe) {
515        // Workaround for race condition in HDFS-4380
516        // which throws a NPE if we open a file before any data node has the most recent block
517        // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
518        LOG.warn("Got NPE opening reader, will retry.");
519        reader = null;
520        ee = npe;
521      }
522      if (reader == null) {
523        // sleep before next attempt
524        try {
525          Thread.sleep(retryInterval);
526        } catch (InterruptedException e) {
527        }
528      }
529    }
530    throw new IOException("Could not open reader", ee);
531  }
532
533  // For HBASE-15019
534  private static void recoverLease(final Configuration conf, final Path path) {
535    try {
536      final FileSystem dfs = CommonFSUtils.getCurrentFileSystem(conf);
537      RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
538        @Override
539        public boolean progress() {
540          LOG.debug("Still trying to recover WAL lease: " + path);
541          return true;
542        }
543      });
544    } catch (IOException e) {
545      LOG.warn("unable to recover lease for WAL: " + path, e);
546    }
547  }
548
549  @Override
550  public void addWALActionsListener(WALActionsListener listener) {
551    listeners.add(listener);
552  }
553
554  /**
555   * Get prefix of the log from its name, assuming WAL name in format of
556   * log_prefix.filenumber.log_suffix
557   * @param name Name of the WAL to parse
558   * @return prefix of the log
559   * @see AbstractFSWAL#getCurrentFileName()
560   */
561  public static String getWALPrefixFromWALName(String name) {
562    int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf(".");
563    return name.substring(0, endIndex);
564  }
565}