001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.EOFException;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.time.Instant;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.LocatedFileStatus;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.RemoteIterator;
035import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
036import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
037import org.apache.hadoop.hbase.wal.WAL;
038import org.apache.hadoop.hbase.wal.WAL.Entry;
039import org.apache.hadoop.hbase.wal.WALEdit;
040import org.apache.hadoop.hbase.wal.WALFactory;
041import org.apache.hadoop.hbase.wal.WALKey;
042import org.apache.hadoop.hbase.wal.WALStreamReader;
043import org.apache.hadoop.io.Writable;
044import org.apache.hadoop.mapreduce.InputFormat;
045import org.apache.hadoop.mapreduce.InputSplit;
046import org.apache.hadoop.mapreduce.JobContext;
047import org.apache.hadoop.mapreduce.RecordReader;
048import org.apache.hadoop.mapreduce.TaskAttemptContext;
049import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
050import org.apache.hadoop.mapreduce.security.TokenCache;
051import org.apache.hadoop.util.StringUtils;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files.
058 */
059@InterfaceAudience.Public
060public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
061  private static final Logger LOG = LoggerFactory.getLogger(WALInputFormat.class);
062
063  public static final String START_TIME_KEY = "wal.start.time";
064  public static final String END_TIME_KEY = "wal.end.time";
065
066  /**
067   * {@link InputSplit} for {@link WAL} files. Each split represent exactly one log file.
068   */
069  static class WALSplit extends InputSplit implements Writable {
070    private String logFileName;
071    private long fileSize;
072    private long startTime;
073    private long endTime;
074
075    /** for serialization */
076    public WALSplit() {
077    }
078
079    /**
080     * Represent an WALSplit, i.e. a single WAL file. Start- and EndTime are managed by the split,
081     * so that WAL files can be filtered before WALEdits are passed to the mapper(s).
082     */
083    public WALSplit(String logFileName, long fileSize, long startTime, long endTime) {
084      this.logFileName = logFileName;
085      this.fileSize = fileSize;
086      this.startTime = startTime;
087      this.endTime = endTime;
088    }
089
090    @Override
091    public long getLength() throws IOException, InterruptedException {
092      return fileSize;
093    }
094
095    @Override
096    public String[] getLocations() throws IOException, InterruptedException {
097      // TODO: Find the data node with the most blocks for this WAL?
098      return new String[] {};
099    }
100
101    public String getLogFileName() {
102      return logFileName;
103    }
104
105    public long getStartTime() {
106      return startTime;
107    }
108
109    public long getEndTime() {
110      return endTime;
111    }
112
113    @Override
114    public void readFields(DataInput in) throws IOException {
115      logFileName = in.readUTF();
116      fileSize = in.readLong();
117      startTime = in.readLong();
118      endTime = in.readLong();
119    }
120
121    @Override
122    public void write(DataOutput out) throws IOException {
123      out.writeUTF(logFileName);
124      out.writeLong(fileSize);
125      out.writeLong(startTime);
126      out.writeLong(endTime);
127    }
128
129    @Override
130    public String toString() {
131      return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
132    }
133  }
134
135  /**
136   * {@link RecordReader} for an {@link WAL} file. Implementation shared with deprecated
137   * HLogInputFormat.
138   */
139  static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> {
140    private WALStreamReader reader = null;
141    // visible until we can remove the deprecated HLogInputFormat
142    Entry currentEntry = new Entry();
143    private long startTime;
144    private long endTime;
145    private Configuration conf;
146    private Path logFile;
147    private long currentPos;
148
149    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
150        justification = "HDFS-4380")
151    private WALStreamReader openReader(Path path, long startPosition) throws IOException {
152      long retryInterval = 2000; // 2 sec
153      int maxAttempts = 30;
154      int attempt = 0;
155      Exception ee = null;
156      WALStreamReader reader = null;
157      while (reader == null && attempt++ < maxAttempts) {
158        try {
159          // Detect if this is a new file, if so get a new reader else
160          // reset the current reader so that we see the new data
161          reader =
162            WALFactory.createStreamReader(path.getFileSystem(conf), path, conf, startPosition);
163          return reader;
164        } catch (LeaseNotRecoveredException lnre) {
165          // HBASE-15019 the WAL was not closed due to some hiccup.
166          LOG.warn("Try to recover the WAL lease " + path, lnre);
167          AbstractFSWALProvider.recoverLease(conf, path);
168          reader = null;
169          ee = lnre;
170        } catch (NullPointerException npe) {
171          // Workaround for race condition in HDFS-4380
172          // which throws a NPE if we open a file before any data node has the most recent block
173          // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
174          LOG.warn("Got NPE opening reader, will retry.");
175          reader = null;
176          ee = npe;
177        }
178        if (reader == null) {
179          // sleep before next attempt
180          try {
181            Thread.sleep(retryInterval);
182          } catch (InterruptedException e) {
183            Thread.currentThread().interrupt();
184          }
185        }
186      }
187      throw new IOException("Could not open reader", ee);
188    }
189
190    @Override
191    public void initialize(InputSplit split, TaskAttemptContext context)
192      throws IOException, InterruptedException {
193      WALSplit hsplit = (WALSplit) split;
194      logFile = new Path(hsplit.getLogFileName());
195      conf = context.getConfiguration();
196      LOG.info("Opening {} for {}", logFile, split);
197      openReader(logFile);
198      this.startTime = hsplit.getStartTime();
199      this.endTime = hsplit.getEndTime();
200    }
201
202    private void openReader(Path path) throws IOException {
203      closeReader();
204      reader = openReader(path, currentPos > 0 ? currentPos : -1);
205      setCurrentPath(path);
206    }
207
208    private void setCurrentPath(Path path) {
209      this.logFile = path;
210    }
211
212    private void closeReader() throws IOException {
213      if (reader != null) {
214        reader.close();
215        reader = null;
216      }
217    }
218
219    @Override
220    public boolean nextKeyValue() throws IOException, InterruptedException {
221      if (reader == null) {
222        return false;
223      }
224      this.currentPos = reader.getPosition();
225      Entry temp;
226      long i = -1;
227      try {
228        do {
229          // skip older entries
230          try {
231            temp = reader.next(currentEntry);
232            i++;
233          } catch (EOFException x) {
234            LOG.warn("Corrupted entry detected. Ignoring the rest of the file."
235              + " (This is normal when a RegionServer crashed.)");
236            return false;
237          }
238        } while (temp != null && temp.getKey().getWriteTime() < startTime);
239
240        if (temp == null) {
241          if (i > 0) {
242            LOG.info("Skipped " + i + " entries.");
243          }
244          LOG.info("Reached end of file.");
245          return false;
246        } else if (i > 0) {
247          LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
248        }
249        boolean res = temp.getKey().getWriteTime() <= endTime;
250        if (!res) {
251          LOG.info(
252            "Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
253        }
254        return res;
255      } catch (IOException e) {
256        Path archivedLog = AbstractFSWALProvider.findArchivedLog(logFile, conf);
257        // archivedLog can be null if unable to locate in archiveDir.
258        if (archivedLog != null) {
259          openReader(archivedLog);
260          // Try call again in recursion
261          return nextKeyValue();
262        } else {
263          throw e;
264        }
265      }
266    }
267
268    @Override
269    public WALEdit getCurrentValue() throws IOException, InterruptedException {
270      return currentEntry.getEdit();
271    }
272
273    @Override
274    public float getProgress() throws IOException, InterruptedException {
275      // N/A depends on total number of entries, which is unknown
276      return 0;
277    }
278
279    @Override
280    public void close() throws IOException {
281      LOG.info("Closing reader");
282      if (reader != null) {
283        this.reader.close();
284      }
285    }
286  }
287
288  /**
289   * handler for non-deprecated WALKey version. fold into WALRecordReader once we no longer need to
290   * support HLogInputFormat.
291   */
292  static class WALKeyRecordReader extends WALRecordReader<WALKey> {
293    @Override
294    public WALKey getCurrentKey() throws IOException, InterruptedException {
295      return currentEntry.getKey();
296    }
297  }
298
299  @Override
300  public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
301    return getSplits(context, START_TIME_KEY, END_TIME_KEY);
302  }
303
304  /**
305   * implementation shared with deprecated HLogInputFormat
306   */
307  List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey)
308    throws IOException, InterruptedException {
309    Configuration conf = context.getConfiguration();
310    boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, false);
311    Path[] inputPaths = getInputPaths(conf);
312    // get delegation token for the filesystem
313    TokenCache.obtainTokensForNamenodes(context.getCredentials(), inputPaths, conf);
314    long startTime = conf.getLong(startKey, Long.MIN_VALUE);
315    long endTime = conf.getLong(endKey, Long.MAX_VALUE);
316
317    List<FileStatus> allFiles = new ArrayList<FileStatus>();
318    for (Path inputPath : inputPaths) {
319      FileSystem fs = inputPath.getFileSystem(conf);
320      try {
321        List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime);
322        allFiles.addAll(files);
323      } catch (FileNotFoundException e) {
324        if (ignoreMissing) {
325          LOG.warn("File " + inputPath + " is missing. Skipping it.");
326          continue;
327        }
328        throw e;
329      }
330    }
331    List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
332    for (FileStatus file : allFiles) {
333      splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
334    }
335    return splits;
336  }
337
338  private Path[] getInputPaths(Configuration conf) {
339    String inpDirs = conf.get(FileInputFormat.INPUT_DIR);
340    return StringUtils
341      .stringToPath(inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ",")));
342  }
343
344  /**
345   * @param startTime If file looks like it has a timestamp in its name, we'll check if newer or
346   *                  equal to this value else we will filter out the file. If name does not seem to
347   *                  have a timestamp, we will just return it w/o filtering.
348   * @param endTime   If file looks like it has a timestamp in its name, we'll check if older or
349   *                  equal to this value else we will filter out the file. If name does not seem to
350   *                  have a timestamp, we will just return it w/o filtering.
351   */
352  private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
353    throws IOException {
354    List<FileStatus> result = new ArrayList<>();
355    LOG.debug("Scanning " + dir.toString() + " for WAL files");
356    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir);
357    if (!iter.hasNext()) {
358      return Collections.emptyList();
359    }
360    while (iter.hasNext()) {
361      LocatedFileStatus file = iter.next();
362      if (file.isDirectory()) {
363        // Recurse into sub directories
364        result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
365      } else {
366        addFile(result, file, startTime, endTime);
367      }
368    }
369    // TODO: These results should be sorted? Results could be content of recovered.edits directory
370    // -- null padded increasing numeric -- or a WAL file w/ timestamp suffix or timestamp and
371    // then meta suffix. See AbstractFSWALProvider#WALStartTimeComparator
372    return result;
373  }
374
375  static void addFile(List<FileStatus> result, LocatedFileStatus lfs, long startTime,
376    long endTime) {
377    long timestamp = AbstractFSWALProvider.getTimestamp(lfs.getPath().getName());
378    if (timestamp > 0) {
379      // Looks like a valid timestamp.
380      if (timestamp <= endTime && timestamp >= startTime) {
381        LOG.info("Found {}", lfs.getPath());
382        result.add(lfs);
383      } else {
384        LOG.info("Skipped {}, outside range [{}/{} - {}/{}]", lfs.getPath(), startTime,
385          Instant.ofEpochMilli(startTime), endTime, Instant.ofEpochMilli(endTime));
386      }
387    } else {
388      // If no timestamp, add it regardless.
389      LOG.info("Found (no-timestamp!) {}", lfs);
390      result.add(lfs);
391    }
392  }
393
394  @Override
395  public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split,
396    TaskAttemptContext context) throws IOException, InterruptedException {
397    return new WALKeyRecordReader();
398  }
399}