001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.Comparator;
026import java.util.List;
027import java.util.Objects;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.locks.ReadWriteLock;
030import java.util.concurrent.locks.ReentrantReadWriteLock;
031import java.util.regex.Matcher;
032import java.util.regex.Pattern;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FSDataInputStream;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Abortable;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
042import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
043import org.apache.hadoop.hbase.util.CancelableProgressable;
044import org.apache.hadoop.hbase.util.CommonFSUtils;
045import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
046import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.apache.yetus.audience.InterfaceStability;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
053
054/**
055 * Base class of a WAL Provider that returns a single thread safe WAL that writes to Hadoop FS. By
056 * default, this implementation picks a directory in Hadoop FS based on a combination of
057 * <ul>
058 * <li>the HBase root directory
059 * <li>HConstants.HREGION_LOGDIR_NAME
060 * <li>the given factory's factoryId (usually identifying the regionserver by host:port)
061 * </ul>
062 * It also uses the providerId to differentiate among files.
063 */
064@InterfaceAudience.Private
065@InterfaceStability.Evolving
066public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implements WALProvider {
067
068  private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWALProvider.class);
069
070  /** Separate old log into different dir by regionserver name **/
071  public static final String SEPARATE_OLDLOGDIR = "hbase.separate.oldlogdir.by.regionserver";
072  public static final boolean DEFAULT_SEPARATE_OLDLOGDIR = false;
073
074  // Only public so classes back in regionserver.wal can access
075  public interface Reader extends WAL.Reader {
076    /**
077     * @param fs File system.
078     * @param path Path.
079     * @param c Configuration.
080     * @param s Input stream that may have been pre-opened by the caller; may be null.
081     */
082    void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException;
083  }
084
085  protected volatile T wal;
086  protected WALFactory factory;
087  protected Configuration conf;
088  protected List<WALActionsListener> listeners = new ArrayList<>();
089  protected String providerId;
090  protected AtomicBoolean initialized = new AtomicBoolean(false);
091  // for default wal provider, logPrefix won't change
092  protected String logPrefix;
093  protected Abortable abortable;
094
095  /**
096   * We use walCreateLock to prevent wal recreation in different threads, and also prevent getWALs
097   * missing the newly created WAL, see HBASE-21503 for more details.
098   */
099  private final ReadWriteLock walCreateLock = new ReentrantReadWriteLock();
100
101  /**
102   * @param factory factory that made us, identity used for FS layout. may not be null
103   * @param conf may not be null
104   * @param providerId differentiate between providers from one factory, used for FS layout. may be
105   *          null
106   */
107  @Override
108  public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
109      throws IOException {
110    if (!initialized.compareAndSet(false, true)) {
111      throw new IllegalStateException("WALProvider.init should only be called once.");
112    }
113    this.factory = factory;
114    this.conf = conf;
115    this.providerId = providerId;
116    // get log prefix
117    StringBuilder sb = new StringBuilder().append(factory.factoryId);
118    if (providerId != null) {
119      if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
120        sb.append(providerId);
121      } else {
122        sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
123      }
124    }
125    logPrefix = sb.toString();
126    this.abortable = abortable;
127    doInit(conf);
128  }
129
130  @Override
131  public List<WAL> getWALs() {
132    if (wal != null) {
133      return Lists.newArrayList(wal);
134    }
135    walCreateLock.readLock().lock();
136    try {
137      if (wal == null) {
138        return Collections.emptyList();
139      } else {
140        return Lists.newArrayList(wal);
141      }
142    } finally {
143      walCreateLock.readLock().unlock();
144    }
145  }
146
147  @Override
148  public T getWAL(RegionInfo region) throws IOException {
149    T walCopy = wal;
150    if (walCopy != null) {
151      return walCopy;
152    }
153    walCreateLock.writeLock().lock();
154    try {
155      walCopy = wal;
156      if (walCopy != null) {
157        return walCopy;
158      }
159      walCopy = createWAL();
160      boolean succ = false;
161      try {
162        walCopy.init();
163        succ = true;
164      } finally {
165        if (!succ) {
166          walCopy.close();
167        }
168      }
169      wal = walCopy;
170      return walCopy;
171    } finally {
172      walCreateLock.writeLock().unlock();
173    }
174  }
175
176  protected abstract T createWAL() throws IOException;
177
178  protected abstract void doInit(Configuration conf) throws IOException;
179
180  @Override
181  public void shutdown() throws IOException {
182    T log = this.wal;
183    if (log != null) {
184      log.shutdown();
185    }
186  }
187
188  @Override
189  public void close() throws IOException {
190    T log = this.wal;
191    if (log != null) {
192      log.close();
193    }
194  }
195
196  /**
197   * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the
198   * number of files (rolled and active). if either of them aren't, count 0 for that provider.
199   */
200  @Override
201  public long getNumLogFiles() {
202    T log = this.wal;
203    return log == null ? 0 : log.getNumLogFiles();
204  }
205
206  /**
207   * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the
208   * size of files (only rolled). if either of them aren't, count 0 for that provider.
209   */
210  @Override
211  public long getLogFileSize() {
212    T log = this.wal;
213    return log == null ? 0 : log.getLogFileSize();
214  }
215
216  /**
217   * returns the number of rolled WAL files.
218   */
219  public static int getNumRolledLogFiles(WAL wal) {
220    return ((AbstractFSWAL<?>) wal).getNumRolledLogFiles();
221  }
222
223  /**
224   * returns the size of rolled WAL files.
225   */
226  public static long getLogFileSize(WAL wal) {
227    return ((AbstractFSWAL<?>) wal).getLogFileSize();
228  }
229
230  /**
231   * return the current filename from the current wal.
232   */
233  public static Path getCurrentFileName(final WAL wal) {
234    return ((AbstractFSWAL<?>) wal).getCurrentFileName();
235  }
236
237  /**
238   * request a log roll, but don't actually do it.
239   */
240  static void requestLogRoll(final WAL wal) {
241    ((AbstractFSWAL<?>) wal).requestLogRoll();
242  }
243
244  // should be package private; more visible for use in AbstractFSWAL
245  public static final String WAL_FILE_NAME_DELIMITER = ".";
246  /** The hbase:meta region's WAL filename extension */
247  public static final String META_WAL_PROVIDER_ID = ".meta";
248  static final String DEFAULT_PROVIDER_ID = "default";
249
250  // Implementation details that currently leak in tests or elsewhere follow
251  /** File Extension used while splitting an WAL into regions (HBASE-2312) */
252  public static final String SPLITTING_EXT = "-splitting";
253
254  /**
255   * Pattern used to validate a WAL file name see {@link #validateWALFilename(String)} for
256   * description.
257   */
258  private static final Pattern WAL_FILE_NAME_PATTERN =
259    Pattern.compile("(.+)\\.(\\d+)(\\.[0-9A-Za-z]+)?");
260
261  /**
262   * Define for when no timestamp found.
263   */
264  private static final long NO_TIMESTAMP = -1L;
265
266  /**
267   * It returns the file create timestamp (the 'FileNum') from the file name. For name format see
268   * {@link #validateWALFilename(String)} public until remaining tests move to o.a.h.h.wal
269   * @param wal must not be null
270   * @return the file number that is part of the WAL file name
271   */
272  public static long extractFileNumFromWAL(final WAL wal) {
273    final Path walPath = ((AbstractFSWAL<?>) wal).getCurrentFileName();
274    if (walPath == null) {
275      throw new IllegalArgumentException("The WAL path couldn't be null");
276    }
277    String name = walPath.getName();
278    long timestamp = getTimestamp(name);
279    if (timestamp == NO_TIMESTAMP) {
280      throw new IllegalArgumentException(name + " is not a valid wal file name");
281    }
282    return timestamp;
283  }
284
285  /**
286   * A WAL file name is of the format: &lt;wal-name&gt;{@link #WAL_FILE_NAME_DELIMITER}
287   * &lt;file-creation-timestamp&gt;[.&lt;suffix&gt;]. provider-name is usually made up of a
288   * server-name and a provider-id
289   * @param filename name of the file to validate
290   * @return <tt>true</tt> if the filename matches an WAL, <tt>false</tt> otherwise
291   */
292  public static boolean validateWALFilename(String filename) {
293    return WAL_FILE_NAME_PATTERN.matcher(filename).matches();
294  }
295
296  /**
297   * Split a WAL filename to get a start time. WALs usually have the time we start writing to them
298   * with as part of their name, usually the suffix. Sometimes there will be an extra suffix as when
299   * it is a WAL for the meta table. For example, WALs might look like this
300   * <code>10.20.20.171%3A60020.1277499063250</code> where <code>1277499063250</code> is the
301   * timestamp. Could also be a meta WAL which adds a '.meta' suffix or a
302   * synchronous replication WAL which adds a '.syncrep' suffix. Check for these. File also may have
303   * no timestamp on it. For example the recovered.edits files are WALs but are named in ascending
304   * order. Here is an example: 0000000000000016310. Allow for this.
305   * @param name Name of the WAL file.
306   * @return Timestamp or {@link #NO_TIMESTAMP}.
307   */
308  public static long getTimestamp(String name) {
309    Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name);
310    return matcher.matches() ? Long.parseLong(matcher.group(2)): NO_TIMESTAMP;
311  }
312
313  /**
314   * Construct the directory name for all WALs on a given server. Dir names currently look like this
315   * for WALs: <code>hbase//WALs/kalashnikov.att.net,61634,1486865297088</code>.
316   * @param serverName Server name formatted as described in {@link ServerName}
317   * @return the relative WAL directory name, e.g. <code>.logs/1.example.org,60030,12345</code> if
318   *         <code>serverName</code> passed is <code>1.example.org,60030,12345</code>
319   */
320  public static String getWALDirectoryName(final String serverName) {
321    StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
322    dirName.append("/");
323    dirName.append(serverName);
324    return dirName.toString();
325  }
326
327  /**
328   * Construct the directory name for all old WALs on a given server. The default old WALs dir looks
329   * like: <code>hbase/oldWALs</code>. If you config hbase.separate.oldlogdir.by.regionserver to
330   * true, it looks like <code>hbase//oldWALs/kalashnikov.att.net,61634,1486865297088</code>.
331   * @param serverName Server name formatted as described in {@link ServerName}
332   * @return the relative WAL directory name
333   */
334  public static String getWALArchiveDirectoryName(Configuration conf, final String serverName) {
335    StringBuilder dirName = new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME);
336    if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
337      dirName.append(Path.SEPARATOR);
338      dirName.append(serverName);
339    }
340    return dirName.toString();
341  }
342
343  /**
344   * Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts,
345   * this method ignores the format of the logfile component. Current format: [base directory for
346   * hbase]/hbase/.logs/ServerName/logfile or [base directory for
347   * hbase]/hbase/.logs/ServerName-splitting/logfile Expected to work for individual log files and
348   * server-specific directories.
349   * @return null if it's not a log file. Returns the ServerName of the region server that created
350   *         this log file otherwise.
351   */
352  public static ServerName getServerNameFromWALDirectoryName(Configuration conf, String path)
353      throws IOException {
354    if (path == null || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) {
355      return null;
356    }
357
358    if (conf == null) {
359      throw new IllegalArgumentException("parameter conf must be set");
360    }
361
362    final String rootDir = conf.get(HConstants.HBASE_DIR);
363    if (rootDir == null || rootDir.isEmpty()) {
364      throw new IllegalArgumentException(HConstants.HBASE_DIR + " key not found in conf.");
365    }
366
367    final StringBuilder startPathSB = new StringBuilder(rootDir);
368    if (!rootDir.endsWith("/")) {
369      startPathSB.append('/');
370    }
371    startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
372    if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/")) {
373      startPathSB.append('/');
374    }
375    final String startPath = startPathSB.toString();
376
377    String fullPath;
378    try {
379      fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString();
380    } catch (IllegalArgumentException e) {
381      LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage());
382      return null;
383    }
384
385    if (!fullPath.startsWith(startPath)) {
386      return null;
387    }
388
389    final String serverNameAndFile = fullPath.substring(startPath.length());
390
391    if (serverNameAndFile.indexOf('/') < "a,0,0".length()) {
392      // Either it's a file (not a directory) or it's not a ServerName format
393      return null;
394    }
395
396    Path p = new Path(path);
397    return getServerNameFromWALDirectoryName(p);
398  }
399
400  /**
401   * This function returns region server name from a log file name which is in one of the following
402   * formats:
403   * <ul>
404   * <li>hdfs://&lt;name node&gt;/hbase/.logs/&lt;server name&gt;-splitting/...</li>
405   * <li>hdfs://&lt;name node&gt;/hbase/.logs/&lt;server name&gt;/...</li>
406   * </ul>
407   * @return null if the passed in logFile isn't a valid WAL file path
408   */
409  public static ServerName getServerNameFromWALDirectoryName(Path logFile) {
410    String logDirName = logFile.getParent().getName();
411    // We were passed the directory and not a file in it.
412    if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) {
413      logDirName = logFile.getName();
414    }
415    ServerName serverName = null;
416    if (logDirName.endsWith(SPLITTING_EXT)) {
417      logDirName = logDirName.substring(0, logDirName.length() - SPLITTING_EXT.length());
418    }
419    try {
420      serverName = ServerName.parseServerName(logDirName);
421    } catch (IllegalArgumentException | IllegalStateException ex) {
422      serverName = null;
423      LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
424    }
425    if (serverName != null && serverName.getStartcode() < 0) {
426      LOG.warn("Invalid log file path=" + logFile);
427      serverName = null;
428    }
429    return serverName;
430  }
431
432  public static boolean isMetaFile(Path p) {
433    return isMetaFile(p.getName());
434  }
435
436  /**
437   * @return True if String ends in {@link #META_WAL_PROVIDER_ID}
438   */
439  public static boolean isMetaFile(String p) {
440    return p != null && p.endsWith(META_WAL_PROVIDER_ID);
441  }
442
443  /**
444   * Comparator used to compare WAL files together based on their start time.
445   * Just compares start times and nothing else.
446   */
447  public static class WALStartTimeComparator implements Comparator<Path> {
448    @Override
449    public int compare(Path o1, Path o2) {
450      return Long.compare(getTS(o1), getTS(o2));
451    }
452
453    /**
454     * Split a path to get the start time
455     * For example: 10.20.20.171%3A60020.1277499063250
456     * Could also be a meta WAL which adds a '.meta' suffix or a synchronous replication WAL
457     * which adds a '.syncrep' suffix. Check.
458     * @param p path to split
459     * @return start time
460     */
461    private static long getTS(Path p) {
462      return getTimestamp(p.getName());
463    }
464  }
465
466
467
468  public static boolean isArchivedLogFile(Path p) {
469    String oldLog = Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME + Path.SEPARATOR;
470    return p.toString().contains(oldLog);
471  }
472
473  /**
474   * Get the archived WAL file path
475   * @param path - active WAL file path
476   * @param conf - configuration
477   * @return archived path if exists, path - otherwise
478   * @throws IOException exception
479   */
480  public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
481    Path rootDir = CommonFSUtils.getWALRootDir(conf);
482    Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
483    if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
484      ServerName serverName = getServerNameFromWALDirectoryName(path);
485      if (serverName == null) {
486        LOG.error("Couldn't locate log: " + path);
487        return path;
488      }
489      oldLogDir = new Path(oldLogDir, serverName.getServerName());
490    }
491    Path archivedLogLocation = new Path(oldLogDir, path.getName());
492    final FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
493
494    if (fs.exists(archivedLogLocation)) {
495      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
496      return archivedLogLocation;
497    } else {
498      LOG.error("Couldn't locate log: " + path);
499      return path;
500    }
501  }
502
503  /**
504   * Opens WAL reader with retries and additional exception handling
505   * @param path path to WAL file
506   * @param conf configuration
507   * @return WAL Reader instance
508   */
509  public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf)
510      throws IOException {
511    long retryInterval = 2000; // 2 sec
512    int maxAttempts = 30;
513    int attempt = 0;
514    Exception ee = null;
515    org.apache.hadoop.hbase.wal.WAL.Reader reader = null;
516    while (reader == null && attempt++ < maxAttempts) {
517      try {
518        // Detect if this is a new file, if so get a new reader else
519        // reset the current reader so that we see the new data
520        reader = WALFactory.createReader(path.getFileSystem(conf), path, conf);
521        return reader;
522      } catch (FileNotFoundException fnfe) {
523        // If the log was archived, continue reading from there
524        Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf);
525        if (!Objects.equals(path, archivedLog)) {
526          return openReader(archivedLog, conf);
527        } else {
528          throw fnfe;
529        }
530      } catch (LeaseNotRecoveredException lnre) {
531        // HBASE-15019 the WAL was not closed due to some hiccup.
532        LOG.warn("Try to recover the WAL lease " + path, lnre);
533        recoverLease(conf, path);
534        reader = null;
535        ee = lnre;
536      } catch (NullPointerException npe) {
537        // Workaround for race condition in HDFS-4380
538        // which throws a NPE if we open a file before any data node has the most recent block
539        // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
540        LOG.warn("Got NPE opening reader, will retry.");
541        reader = null;
542        ee = npe;
543      }
544      if (reader == null) {
545        // sleep before next attempt
546        try {
547          Thread.sleep(retryInterval);
548        } catch (InterruptedException e) {
549        }
550      }
551    }
552    throw new IOException("Could not open reader", ee);
553  }
554
555  // For HBASE-15019
556  private static void recoverLease(final Configuration conf, final Path path) {
557    try {
558      final FileSystem dfs = CommonFSUtils.getCurrentFileSystem(conf);
559      RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
560        @Override
561        public boolean progress() {
562          LOG.debug("Still trying to recover WAL lease: " + path);
563          return true;
564        }
565      });
566    } catch (IOException e) {
567      LOG.warn("unable to recover lease for WAL: " + path, e);
568    }
569  }
570
571  @Override
572  public void addWALActionsListener(WALActionsListener listener) {
573    listeners.add(listener);
574  }
575
576  private static String getWALNameGroupFromWALName(String name, int group) {
577    Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name);
578    if (matcher.matches()) {
579      return matcher.group(group);
580    } else {
581      throw new IllegalArgumentException(name + " is not a valid wal file name");
582    }
583  }
584  /**
585   * Get prefix of the log from its name, assuming WAL name in format of
586   * log_prefix.filenumber.log_suffix
587   * @param name Name of the WAL to parse
588   * @return prefix of the log
589   * @throws IllegalArgumentException if the name passed in is not a valid wal file name
590   * @see AbstractFSWAL#getCurrentFileName()
591   */
592  public static String getWALPrefixFromWALName(String name) {
593    return getWALNameGroupFromWALName(name, 1);
594  }
595}