001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.List;
026import java.util.Objects;
027import java.util.concurrent.atomic.AtomicBoolean;
028import java.util.concurrent.locks.ReadWriteLock;
029import java.util.concurrent.locks.ReentrantReadWriteLock;
030import java.util.regex.Pattern;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FSDataInputStream;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Abortable;
036import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
041import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
042import org.apache.hadoop.hbase.util.CancelableProgressable;
043import org.apache.hadoop.hbase.util.CommonFSUtils;
044import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
045import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.apache.yetus.audience.InterfaceStability;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
052
053/**
054 * Base class of a WAL Provider that returns a single thread safe WAL that writes to Hadoop FS. By
055 * default, this implementation picks a directory in Hadoop FS based on a combination of
056 * <ul>
057 * <li>the HBase root directory
058 * <li>HConstants.HREGION_LOGDIR_NAME
059 * <li>the given factory's factoryId (usually identifying the regionserver by host:port)
060 * </ul>
061 * It also uses the providerId to differentiate among files.
062 */
063@InterfaceAudience.Private
064@InterfaceStability.Evolving
065public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implements WALProvider {
066
067  private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWALProvider.class);
068
069  /** Separate old log into different dir by regionserver name **/
070  public static final String SEPARATE_OLDLOGDIR = "hbase.separate.oldlogdir.by.regionserver";
071  public static final boolean DEFAULT_SEPARATE_OLDLOGDIR = false;
072
073  // Only public so classes back in regionserver.wal can access
074  public interface Reader extends WAL.Reader {
075    /**
076     * @param fs File system.
077     * @param path Path.
078     * @param c Configuration.
079     * @param s Input stream that may have been pre-opened by the caller; may be null.
080     */
081    void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException;
082  }
083
084  protected volatile T wal;
085  protected WALFactory factory;
086  protected Configuration conf;
087  protected List<WALActionsListener> listeners = new ArrayList<>();
088  protected String providerId;
089  protected AtomicBoolean initialized = new AtomicBoolean(false);
090  // for default wal provider, logPrefix won't change
091  protected String logPrefix;
092  protected Abortable abortable;
093
094  /**
095   * We use walCreateLock to prevent wal recreation in different threads, and also prevent getWALs
096   * missing the newly created WAL, see HBASE-21503 for more details.
097   */
098  private final ReadWriteLock walCreateLock = new ReentrantReadWriteLock();
099
100  /**
101   * @param factory factory that made us, identity used for FS layout. may not be null
102   * @param conf may not be null
103   * @param providerId differentiate between providers from one factory, used for FS layout. may be
104   *          null
105   */
106  @Override
107  public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
108      throws IOException {
109    if (!initialized.compareAndSet(false, true)) {
110      throw new IllegalStateException("WALProvider.init should only be called once.");
111    }
112    this.factory = factory;
113    this.conf = conf;
114    this.providerId = providerId;
115    // get log prefix
116    StringBuilder sb = new StringBuilder().append(factory.factoryId);
117    if (providerId != null) {
118      if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
119        sb.append(providerId);
120      } else {
121        sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
122      }
123    }
124    logPrefix = sb.toString();
125    this.abortable = abortable;
126    doInit(conf);
127  }
128
129  @Override
130  public List<WAL> getWALs() {
131    if (wal != null) {
132      return Lists.newArrayList(wal);
133    }
134    walCreateLock.readLock().lock();
135    try {
136      if (wal == null) {
137        return Collections.emptyList();
138      } else {
139        return Lists.newArrayList(wal);
140      }
141    } finally {
142      walCreateLock.readLock().unlock();
143    }
144  }
145
146  @Override
147  public T getWAL(RegionInfo region) throws IOException {
148    T walCopy = wal;
149    if (walCopy != null) {
150      return walCopy;
151    }
152    walCreateLock.writeLock().lock();
153    try {
154      walCopy = wal;
155      if (walCopy != null) {
156        return walCopy;
157      }
158      walCopy = createWAL();
159      boolean succ = false;
160      try {
161        walCopy.init();
162        succ = true;
163      } finally {
164        if (!succ) {
165          try {
166            walCopy.close();
167          } catch (Throwable t) {
168            throw new FailedCloseWALAfterInitializedErrorException(
169              "Failed close after init wal failed.", t);
170          }
171        }
172      }
173      wal = walCopy;
174      return walCopy;
175    } finally {
176      walCreateLock.writeLock().unlock();
177    }
178  }
179
180  protected abstract T createWAL() throws IOException;
181
182  protected abstract void doInit(Configuration conf) throws IOException;
183
184  @Override
185  public void shutdown() throws IOException {
186    T log = this.wal;
187    if (log != null) {
188      log.shutdown();
189    }
190  }
191
192  @Override
193  public void close() throws IOException {
194    T log = this.wal;
195    if (log != null) {
196      log.close();
197    }
198  }
199
200  /**
201   * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the
202   * number of files (rolled and active). if either of them aren't, count 0 for that provider.
203   */
204  @Override
205  public long getNumLogFiles() {
206    T log = this.wal;
207    return log == null ? 0 : log.getNumLogFiles();
208  }
209
210  /**
211   * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the
212   * size of files (only rolled). if either of them aren't, count 0 for that provider.
213   */
214  @Override
215  public long getLogFileSize() {
216    T log = this.wal;
217    return log == null ? 0 : log.getLogFileSize();
218  }
219
220  /**
221   * returns the number of rolled WAL files.
222   */
223  public static int getNumRolledLogFiles(WAL wal) {
224    return ((AbstractFSWAL<?>) wal).getNumRolledLogFiles();
225  }
226
227  /**
228   * returns the size of rolled WAL files.
229   */
230  public static long getLogFileSize(WAL wal) {
231    return ((AbstractFSWAL<?>) wal).getLogFileSize();
232  }
233
234  /**
235   * return the current filename from the current wal.
236   */
237  public static Path getCurrentFileName(final WAL wal) {
238    return ((AbstractFSWAL<?>) wal).getCurrentFileName();
239  }
240
241  /**
242   * request a log roll, but don't actually do it.
243   */
244  static void requestLogRoll(final WAL wal) {
245    ((AbstractFSWAL<?>) wal).requestLogRoll();
246  }
247
248  // should be package private; more visible for use in AbstractFSWAL
249  public static final String WAL_FILE_NAME_DELIMITER = ".";
250  /** The hbase:meta region's WAL filename extension */
251  public static final String META_WAL_PROVIDER_ID = ".meta";
252  static final String DEFAULT_PROVIDER_ID = "default";
253
254  // Implementation details that currently leak in tests or elsewhere follow
255  /** File Extension used while splitting an WAL into regions (HBASE-2312) */
256  public static final String SPLITTING_EXT = "-splitting";
257
258  /**
259   * It returns the file create timestamp from the file name. For name format see
260   * {@link #validateWALFilename(String)} public until remaining tests move to o.a.h.h.wal
261   * @param wal must not be null
262   * @return the file number that is part of the WAL file name
263   */
264  public static long extractFileNumFromWAL(final WAL wal) {
265    final Path walName = ((AbstractFSWAL<?>) wal).getCurrentFileName();
266    if (walName == null) {
267      throw new IllegalArgumentException("The WAL path couldn't be null");
268    }
269    final String[] walPathStrs = walName.toString().split("\\" + WAL_FILE_NAME_DELIMITER);
270    return Long.parseLong(walPathStrs[walPathStrs.length - (isMetaFile(walName) ? 2 : 1)]);
271  }
272
273  /**
274   * Pattern used to validate a WAL file name see {@link #validateWALFilename(String)} for
275   * description.
276   */
277  private static final Pattern pattern =
278    Pattern.compile(".*\\.\\d*(" + META_WAL_PROVIDER_ID + ")*");
279
280  /**
281   * A WAL file name is of the format: &lt;wal-name&gt;{@link #WAL_FILE_NAME_DELIMITER}
282   * &lt;file-creation-timestamp&gt;[.meta]. provider-name is usually made up of a server-name and a
283   * provider-id
284   * @param filename name of the file to validate
285   * @return <tt>true</tt> if the filename matches an WAL, <tt>false</tt> otherwise
286   */
287  public static boolean validateWALFilename(String filename) {
288    return pattern.matcher(filename).matches();
289  }
290
291  /**
292   * Construct the directory name for all WALs on a given server. Dir names currently look like this
293   * for WALs: <code>hbase//WALs/kalashnikov.att.net,61634,1486865297088</code>.
294   * @param serverName Server name formatted as described in {@link ServerName}
295   * @return the relative WAL directory name, e.g. <code>.logs/1.example.org,60030,12345</code> if
296   *         <code>serverName</code> passed is <code>1.example.org,60030,12345</code>
297   */
298  public static String getWALDirectoryName(final String serverName) {
299    StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
300    dirName.append("/");
301    dirName.append(serverName);
302    return dirName.toString();
303  }
304
305  /**
306   * Construct the directory name for all old WALs on a given server. The default old WALs dir looks
307   * like: <code>hbase/oldWALs</code>. If you config hbase.separate.oldlogdir.by.regionserver to
308   * true, it looks like <code>hbase//oldWALs/kalashnikov.att.net,61634,1486865297088</code>.
309   * @param serverName Server name formatted as described in {@link ServerName}
310   * @return the relative WAL directory name
311   */
312  public static String getWALArchiveDirectoryName(Configuration conf, final String serverName) {
313    StringBuilder dirName = new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME);
314    if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
315      dirName.append(Path.SEPARATOR);
316      dirName.append(serverName);
317    }
318    return dirName.toString();
319  }
320
321  /**
322   * Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts,
323   * this method ignores the format of the logfile component. Current format: [base directory for
324   * hbase]/hbase/.logs/ServerName/logfile or [base directory for
325   * hbase]/hbase/.logs/ServerName-splitting/logfile Expected to work for individual log files and
326   * server-specific directories.
327   * @return null if it's not a log file. Returns the ServerName of the region server that created
328   *         this log file otherwise.
329   */
330  public static ServerName getServerNameFromWALDirectoryName(Configuration conf, String path)
331      throws IOException {
332    if (path == null || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) {
333      return null;
334    }
335
336    if (conf == null) {
337      throw new IllegalArgumentException("parameter conf must be set");
338    }
339
340    final String rootDir = conf.get(HConstants.HBASE_DIR);
341    if (rootDir == null || rootDir.isEmpty()) {
342      throw new IllegalArgumentException(HConstants.HBASE_DIR + " key not found in conf.");
343    }
344
345    final StringBuilder startPathSB = new StringBuilder(rootDir);
346    if (!rootDir.endsWith("/")) {
347      startPathSB.append('/');
348    }
349    startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
350    if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/")) {
351      startPathSB.append('/');
352    }
353    final String startPath = startPathSB.toString();
354
355    String fullPath;
356    try {
357      fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString();
358    } catch (IllegalArgumentException e) {
359      LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage());
360      return null;
361    }
362
363    if (!fullPath.startsWith(startPath)) {
364      return null;
365    }
366
367    final String serverNameAndFile = fullPath.substring(startPath.length());
368
369    if (serverNameAndFile.indexOf('/') < "a,0,0".length()) {
370      // Either it's a file (not a directory) or it's not a ServerName format
371      return null;
372    }
373
374    Path p = new Path(path);
375    return getServerNameFromWALDirectoryName(p);
376  }
377
378  /**
379   * This function returns region server name from a log file name which is in one of the following
380   * formats:
381   * <ul>
382   * <li>hdfs://&lt;name node&gt;/hbase/.logs/&lt;server name&gt;-splitting/...</li>
383   * <li>hdfs://&lt;name node&gt;/hbase/.logs/&lt;server name&gt;/...</li>
384   * </ul>
385   * @return null if the passed in logFile isn't a valid WAL file path
386   */
387  public static ServerName getServerNameFromWALDirectoryName(Path logFile) {
388    String logDirName = logFile.getParent().getName();
389    // We were passed the directory and not a file in it.
390    if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) {
391      logDirName = logFile.getName();
392    }
393    ServerName serverName = null;
394    if (logDirName.endsWith(SPLITTING_EXT)) {
395      logDirName = logDirName.substring(0, logDirName.length() - SPLITTING_EXT.length());
396    }
397    try {
398      serverName = ServerName.parseServerName(logDirName);
399    } catch (IllegalArgumentException | IllegalStateException ex) {
400      serverName = null;
401      LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
402    }
403    if (serverName != null && serverName.getStartcode() < 0) {
404      LOG.warn("Invalid log file path=" + logFile);
405      serverName = null;
406    }
407    return serverName;
408  }
409
410  public static boolean isMetaFile(Path p) {
411    return isMetaFile(p.getName());
412  }
413
414  /**
415   * @return True if String ends in {@link #META_WAL_PROVIDER_ID}
416   */
417  public static boolean isMetaFile(String p) {
418    return p != null && p.endsWith(META_WAL_PROVIDER_ID);
419  }
420
421  /**
422   * Comparator used to compare WAL files together based on their start time.
423   * Just compares start times and nothing else.
424   */
425  public static class WALStartTimeComparator implements Comparator<Path> {
426    @Override
427    public int compare(Path o1, Path o2) {
428      return Long.compare(getTS(o1), getTS(o2));
429    }
430
431    /**
432     * Split a path to get the start time
433     * For example: 10.20.20.171%3A60020.1277499063250
434     * Could also be a meta WAL which adds a '.meta' suffix or a synchronous replication WAL
435     * which adds a '.syncrep' suffix. Check.
436     * @param p path to split
437     * @return start time
438     */
439    private static long getTS(Path p) {
440      return WAL.getTimestamp(p.getName());
441    }
442  }
443
444
445
446  public static boolean isArchivedLogFile(Path p) {
447    String oldLog = Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME + Path.SEPARATOR;
448    return p.toString().contains(oldLog);
449  }
450
451  /**
452   * Get the archived WAL file path
453   * @param path - active WAL file path
454   * @param conf - configuration
455   * @return archived path if exists, path - otherwise
456   * @throws IOException exception
457   */
458  public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
459    Path rootDir = CommonFSUtils.getWALRootDir(conf);
460    Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
461    if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
462      ServerName serverName = getServerNameFromWALDirectoryName(path);
463      if (serverName == null) {
464        LOG.error("Couldn't locate log: " + path);
465        return path;
466      }
467      oldLogDir = new Path(oldLogDir, serverName.getServerName());
468    }
469    Path archivedLogLocation = new Path(oldLogDir, path.getName());
470    final FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
471
472    if (fs.exists(archivedLogLocation)) {
473      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
474      return archivedLogLocation;
475    } else {
476      LOG.error("Couldn't locate log: " + path);
477      return path;
478    }
479  }
480
481  /**
482   * Opens WAL reader with retries and additional exception handling
483   * @param path path to WAL file
484   * @param conf configuration
485   * @return WAL Reader instance
486   */
487  public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf)
488      throws IOException {
489    long retryInterval = 2000; // 2 sec
490    int maxAttempts = 30;
491    int attempt = 0;
492    Exception ee = null;
493    org.apache.hadoop.hbase.wal.WAL.Reader reader = null;
494    while (reader == null && attempt++ < maxAttempts) {
495      try {
496        // Detect if this is a new file, if so get a new reader else
497        // reset the current reader so that we see the new data
498        reader = WALFactory.createReader(path.getFileSystem(conf), path, conf);
499        return reader;
500      } catch (FileNotFoundException fnfe) {
501        // If the log was archived, continue reading from there
502        Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf);
503        if (!Objects.equals(path, archivedLog)) {
504          return openReader(archivedLog, conf);
505        } else {
506          throw fnfe;
507        }
508      } catch (LeaseNotRecoveredException lnre) {
509        // HBASE-15019 the WAL was not closed due to some hiccup.
510        LOG.warn("Try to recover the WAL lease " + path, lnre);
511        recoverLease(conf, path);
512        reader = null;
513        ee = lnre;
514      } catch (NullPointerException npe) {
515        // Workaround for race condition in HDFS-4380
516        // which throws a NPE if we open a file before any data node has the most recent block
517        // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
518        LOG.warn("Got NPE opening reader, will retry.");
519        reader = null;
520        ee = npe;
521      }
522      if (reader == null) {
523        // sleep before next attempt
524        try {
525          Thread.sleep(retryInterval);
526        } catch (InterruptedException e) {
527        }
528      }
529    }
530    throw new IOException("Could not open reader", ee);
531  }
532
533  // For HBASE-15019
534  private static void recoverLease(final Configuration conf, final Path path) {
535    try {
536      final FileSystem dfs = CommonFSUtils.getCurrentFileSystem(conf);
537      RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
538        @Override
539        public boolean progress() {
540          LOG.debug("Still trying to recover WAL lease: " + path);
541          return true;
542        }
543      });
544    } catch (IOException e) {
545      LOG.warn("unable to recover lease for WAL: " + path, e);
546    }
547  }
548
549  @Override
550  public void addWALActionsListener(WALActionsListener listener) {
551    listeners.add(listener);
552  }
553
554  /**
555   * Get prefix of the log from its name, assuming WAL name in format of
556   * log_prefix.filenumber.log_suffix
557   * @param name Name of the WAL to parse
558   * @return prefix of the log
559   * @see AbstractFSWAL#getCurrentFileName()
560   */
561  public static String getWALPrefixFromWALName(String name) {
562    int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf(".");
563    return name.substring(0, endIndex);
564  }
565}