001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.EOFException;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.time.Instant;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.LocatedFileStatus;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.RemoteIterator;
035import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException;
036import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
037import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
038import org.apache.hadoop.hbase.wal.WAL;
039import org.apache.hadoop.hbase.wal.WAL.Entry;
040import org.apache.hadoop.hbase.wal.WALEdit;
041import org.apache.hadoop.hbase.wal.WALFactory;
042import org.apache.hadoop.hbase.wal.WALKey;
043import org.apache.hadoop.hbase.wal.WALStreamReader;
044import org.apache.hadoop.io.Writable;
045import org.apache.hadoop.mapreduce.InputFormat;
046import org.apache.hadoop.mapreduce.InputSplit;
047import org.apache.hadoop.mapreduce.JobContext;
048import org.apache.hadoop.mapreduce.RecordReader;
049import org.apache.hadoop.mapreduce.TaskAttemptContext;
050import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
051import org.apache.hadoop.mapreduce.security.TokenCache;
052import org.apache.hadoop.util.StringUtils;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files.
059 */
060@InterfaceAudience.Public
061public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
062  private static final Logger LOG = LoggerFactory.getLogger(WALInputFormat.class);
063
064  public static final String START_TIME_KEY = "wal.start.time";
065  public static final String END_TIME_KEY = "wal.end.time";
066
067  /**
068   * {@link InputSplit} for {@link WAL} files. Each split represent exactly one log file.
069   */
070  static class WALSplit extends InputSplit implements Writable {
071    private String logFileName;
072    private long fileSize;
073    private long startTime;
074    private long endTime;
075
076    /** for serialization */
077    public WALSplit() {
078    }
079
080    /**
081     * Represent an WALSplit, i.e. a single WAL file. Start- and EndTime are managed by the split,
082     * so that WAL files can be filtered before WALEdits are passed to the mapper(s).
083     */
084    public WALSplit(String logFileName, long fileSize, long startTime, long endTime) {
085      this.logFileName = logFileName;
086      this.fileSize = fileSize;
087      this.startTime = startTime;
088      this.endTime = endTime;
089    }
090
091    @Override
092    public long getLength() throws IOException, InterruptedException {
093      return fileSize;
094    }
095
096    @Override
097    public String[] getLocations() throws IOException, InterruptedException {
098      // TODO: Find the data node with the most blocks for this WAL?
099      return new String[] {};
100    }
101
102    public String getLogFileName() {
103      return logFileName;
104    }
105
106    public long getStartTime() {
107      return startTime;
108    }
109
110    public long getEndTime() {
111      return endTime;
112    }
113
114    @Override
115    public void readFields(DataInput in) throws IOException {
116      logFileName = in.readUTF();
117      fileSize = in.readLong();
118      startTime = in.readLong();
119      endTime = in.readLong();
120    }
121
122    @Override
123    public void write(DataOutput out) throws IOException {
124      out.writeUTF(logFileName);
125      out.writeLong(fileSize);
126      out.writeLong(startTime);
127      out.writeLong(endTime);
128    }
129
130    @Override
131    public String toString() {
132      return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
133    }
134  }
135
136  /**
137   * {@link RecordReader} for an {@link WAL} file. Implementation shared with deprecated
138   * HLogInputFormat.
139   */
140  static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> {
141    private WALStreamReader reader = null;
142    // visible until we can remove the deprecated HLogInputFormat
143    Entry currentEntry = new Entry();
144    private long startTime;
145    private long endTime;
146    private Configuration conf;
147    private Path logFile;
148    private long currentPos;
149
150    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
151        justification = "HDFS-4380")
152    private WALStreamReader openReader(Path path, long startPosition) throws IOException {
153      long retryInterval = 2000; // 2 sec
154      int maxAttempts = 30;
155      int attempt = 0;
156      Exception ee = null;
157      WALStreamReader reader = null;
158      while (reader == null && attempt++ < maxAttempts) {
159        try {
160          // Detect if this is a new file, if so get a new reader else
161          // reset the current reader so that we see the new data
162          reader =
163            WALFactory.createStreamReader(path.getFileSystem(conf), path, conf, startPosition);
164          return reader;
165        } catch (WALHeaderEOFException wheofe) {
166          // We hit EOF while reading the WAL header. A file that ever had an entry synced to it
167          // necessarily has a complete, readable header (a sync flushes the header too), so a
168          // header EOF means the file holds nothing recoverable right now. For a file that is not
169          // being actively written (a closed/archived WAL, or one left empty by a crashed
170          // RegionServer) the header never appears, so retrying only delays an inevitable skip.
171          // The one case a retry could help is a WAL still being written by the legacy
172          // (non-async) writer that has not yet flushed its header; but we skip that too.
173          LOG.warn("Got WALHeaderEOFException opening reader for {}, skipping empty WAL file.",
174            path, wheofe);
175          return null;
176        } catch (LeaseNotRecoveredException lnre) {
177          // HBASE-15019 the WAL was not closed due to some hiccup.
178          LOG.warn("Try to recover the WAL lease " + path, lnre);
179          AbstractFSWALProvider.recoverLease(conf, path);
180          reader = null;
181          ee = lnre;
182        } catch (NullPointerException npe) {
183          // Workaround for race condition in HDFS-4380
184          // which throws a NPE if we open a file before any data node has the most recent block
185          // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
186          LOG.warn("Got NPE opening reader, will retry.");
187          reader = null;
188          ee = npe;
189        }
190        if (reader == null) {
191          // sleep before next attempt
192          try {
193            Thread.sleep(retryInterval);
194          } catch (InterruptedException e) {
195            Thread.currentThread().interrupt();
196          }
197        }
198      }
199      throw new IOException("Could not open reader", ee);
200    }
201
202    @Override
203    public void initialize(InputSplit split, TaskAttemptContext context)
204      throws IOException, InterruptedException {
205      WALSplit hsplit = (WALSplit) split;
206      logFile = new Path(hsplit.getLogFileName());
207      conf = context.getConfiguration();
208      LOG.info("Opening {} for {}", logFile, split);
209      openReader(logFile);
210      this.startTime = hsplit.getStartTime();
211      this.endTime = hsplit.getEndTime();
212    }
213
214    private void openReader(Path path) throws IOException {
215      closeReader();
216      reader = openReader(path, currentPos > 0 ? currentPos : -1);
217      setCurrentPath(path);
218    }
219
220    private void setCurrentPath(Path path) {
221      this.logFile = path;
222    }
223
224    private void closeReader() throws IOException {
225      if (reader != null) {
226        reader.close();
227        reader = null;
228      }
229    }
230
231    @Override
232    public boolean nextKeyValue() throws IOException, InterruptedException {
233      if (reader == null) {
234        return false;
235      }
236      this.currentPos = reader.getPosition();
237      Entry temp;
238      long i = -1;
239      try {
240        do {
241          // skip older entries
242          try {
243            temp = reader.next(currentEntry);
244            i++;
245          } catch (EOFException x) {
246            LOG.warn("Corrupted entry detected. Ignoring the rest of the file."
247              + " (This is normal when a RegionServer crashed.)");
248            return false;
249          }
250        } while (temp != null && temp.getKey().getWriteTime() < startTime);
251
252        if (temp == null) {
253          if (i > 0) {
254            LOG.info("Skipped " + i + " entries.");
255          }
256          LOG.info("Reached end of file.");
257          return false;
258        } else if (i > 0) {
259          LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
260        }
261        boolean res = temp.getKey().getWriteTime() <= endTime;
262        if (!res) {
263          LOG.info(
264            "Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
265        }
266        return res;
267      } catch (IOException e) {
268        Path archivedLog = AbstractFSWALProvider.findArchivedLog(logFile, conf);
269        // archivedLog can be null if unable to locate in archiveDir.
270        if (archivedLog != null) {
271          openReader(archivedLog);
272          // Try call again in recursion
273          return nextKeyValue();
274        } else {
275          throw e;
276        }
277      }
278    }
279
280    @Override
281    public WALEdit getCurrentValue() throws IOException, InterruptedException {
282      return currentEntry.getEdit();
283    }
284
285    @Override
286    public float getProgress() throws IOException, InterruptedException {
287      // N/A depends on total number of entries, which is unknown
288      return 0;
289    }
290
291    @Override
292    public void close() throws IOException {
293      LOG.info("Closing reader");
294      if (reader != null) {
295        this.reader.close();
296      }
297    }
298  }
299
300  /**
301   * handler for non-deprecated WALKey version. fold into WALRecordReader once we no longer need to
302   * support HLogInputFormat.
303   */
304  static class WALKeyRecordReader extends WALRecordReader<WALKey> {
305    @Override
306    public WALKey getCurrentKey() throws IOException, InterruptedException {
307      return currentEntry.getKey();
308    }
309  }
310
311  @Override
312  public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
313    return getSplits(context, START_TIME_KEY, END_TIME_KEY);
314  }
315
316  /**
317   * implementation shared with deprecated HLogInputFormat
318   */
319  List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey)
320    throws IOException, InterruptedException {
321    Configuration conf = context.getConfiguration();
322    boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, false);
323    Path[] inputPaths = getInputPaths(conf);
324    // get delegation token for the filesystem
325    TokenCache.obtainTokensForNamenodes(context.getCredentials(), inputPaths, conf);
326    long startTime = conf.getLong(startKey, Long.MIN_VALUE);
327    long endTime = conf.getLong(endKey, Long.MAX_VALUE);
328
329    List<FileStatus> allFiles = new ArrayList<FileStatus>();
330    for (Path inputPath : inputPaths) {
331      FileSystem fs = inputPath.getFileSystem(conf);
332      try {
333        List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime, conf);
334        allFiles.addAll(files);
335      } catch (FileNotFoundException e) {
336        if (ignoreMissing) {
337          LOG.warn("File " + inputPath + " is missing. Skipping it.");
338          continue;
339        }
340        throw e;
341      }
342    }
343    List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
344    for (FileStatus file : allFiles) {
345      splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
346    }
347    return splits;
348  }
349
350  private Path[] getInputPaths(Configuration conf) {
351    String inpDirs = conf.get(FileInputFormat.INPUT_DIR);
352    return StringUtils
353      .stringToPath(inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ",")));
354  }
355
356  /**
357   * @param startTime If file looks like it has a timestamp in its name, we'll check if newer or
358   *                  equal to this value else we will filter out the file. If name does not seem to
359   *                  have a timestamp, we will just return it w/o filtering.
360   * @param endTime   If file looks like it has a timestamp in its name, we'll check if older or
361   *                  equal to this value else we will filter out the file. If name does not seem to
362   *                  have a timestamp, we will just return it w/o filtering.
363   */
364  private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime,
365    Configuration conf) throws IOException {
366    List<FileStatus> result = new ArrayList<>();
367    LOG.debug("Scanning " + dir.toString() + " for WAL files");
368    RemoteIterator<LocatedFileStatus> iter = listLocatedFileStatus(fs, dir, conf);
369    if (!iter.hasNext()) {
370      return Collections.emptyList();
371    }
372    while (iter.hasNext()) {
373      LocatedFileStatus file = iter.next();
374      if (file.isDirectory()) {
375        // Recurse into sub directories
376        result.addAll(getFiles(fs, file.getPath(), startTime, endTime, conf));
377      } else {
378        addFile(result, file, startTime, endTime);
379      }
380    }
381    // TODO: These results should be sorted? Results could be content of recovered.edits directory
382    // -- null padded increasing numeric -- or a WAL file w/ timestamp suffix or timestamp and
383    // then meta suffix. See AbstractFSWALProvider#WALStartTimeComparator
384    return result;
385  }
386
387  static void addFile(List<FileStatus> result, LocatedFileStatus lfs, long startTime,
388    long endTime) {
389    long timestamp = AbstractFSWALProvider.getTimestamp(lfs.getPath().getName());
390    if (timestamp > 0) {
391      // Looks like a valid timestamp.
392      if (timestamp <= endTime && timestamp >= startTime) {
393        LOG.info("Found {}", lfs.getPath());
394        result.add(lfs);
395      } else {
396        LOG.info("Skipped {}, outside range [{}/{} - {}/{}]", lfs.getPath(), startTime,
397          Instant.ofEpochMilli(startTime), endTime, Instant.ofEpochMilli(endTime));
398      }
399    } else {
400      // If no timestamp, add it regardless.
401      LOG.info("Found (no-timestamp!) {}", lfs);
402      result.add(lfs);
403    }
404  }
405
406  @Override
407  public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split,
408    TaskAttemptContext context) throws IOException, InterruptedException {
409    return new WALKeyRecordReader();
410  }
411
412  /**
413   * Attempts to return the {@link LocatedFileStatus} for the given directory. If the directory does
414   * not exist, it will check if the directory is an archived log file and try to find it
415   */
416  private static RemoteIterator<LocatedFileStatus> listLocatedFileStatus(FileSystem fs, Path dir,
417    Configuration conf) throws IOException {
418    try {
419      return fs.listLocatedStatus(dir);
420    } catch (FileNotFoundException e) {
421      if (AbstractFSWALProvider.isArchivedLogFile(dir)) {
422        throw e;
423      }
424
425      LOG.warn("Log file {} not found, trying to find it in archive directory.", dir);
426      Path archiveFile = AbstractFSWALProvider.findArchivedLog(dir, conf);
427      if (archiveFile == null) {
428        LOG.error("Did not find archive file for {}", dir);
429        throw e;
430      }
431
432      return fs.listLocatedStatus(archiveFile);
433    }
434  }
435}