001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.EOFException;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.time.Instant;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.LocatedFileStatus;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.RemoteIterator;
035import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
036import org.apache.hadoop.hbase.wal.WAL;
037import org.apache.hadoop.hbase.wal.WAL.Entry;
038import org.apache.hadoop.hbase.wal.WAL.Reader;
039import org.apache.hadoop.hbase.wal.WALEdit;
040import org.apache.hadoop.hbase.wal.WALKey;
041import org.apache.hadoop.io.Writable;
042import org.apache.hadoop.mapreduce.InputFormat;
043import org.apache.hadoop.mapreduce.InputSplit;
044import org.apache.hadoop.mapreduce.JobContext;
045import org.apache.hadoop.mapreduce.RecordReader;
046import org.apache.hadoop.mapreduce.TaskAttemptContext;
047import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
048import org.apache.hadoop.util.StringUtils;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files.
055 */
056@InterfaceAudience.Public
057public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
058  private static final Logger LOG = LoggerFactory.getLogger(WALInputFormat.class);
059
060  public static final String START_TIME_KEY = "wal.start.time";
061  public static final String END_TIME_KEY = "wal.end.time";
062
063  /**
064   * {@link InputSplit} for {@link WAL} files. Each split represent exactly one log file.
065   */
066  static class WALSplit extends InputSplit implements Writable {
067    private String logFileName;
068    private long fileSize;
069    private long startTime;
070    private long endTime;
071
072    /** for serialization */
073    public WALSplit() {
074    }
075
076    /**
077     * Represent an WALSplit, i.e. a single WAL file. Start- and EndTime are managed by the split,
078     * so that WAL files can be filtered before WALEdits are passed to the mapper(s).
079     */
080    public WALSplit(String logFileName, long fileSize, long startTime, long endTime) {
081      this.logFileName = logFileName;
082      this.fileSize = fileSize;
083      this.startTime = startTime;
084      this.endTime = endTime;
085    }
086
087    @Override
088    public long getLength() throws IOException, InterruptedException {
089      return fileSize;
090    }
091
092    @Override
093    public String[] getLocations() throws IOException, InterruptedException {
094      // TODO: Find the data node with the most blocks for this WAL?
095      return new String[] {};
096    }
097
098    public String getLogFileName() {
099      return logFileName;
100    }
101
102    public long getStartTime() {
103      return startTime;
104    }
105
106    public long getEndTime() {
107      return endTime;
108    }
109
110    @Override
111    public void readFields(DataInput in) throws IOException {
112      logFileName = in.readUTF();
113      fileSize = in.readLong();
114      startTime = in.readLong();
115      endTime = in.readLong();
116    }
117
118    @Override
119    public void write(DataOutput out) throws IOException {
120      out.writeUTF(logFileName);
121      out.writeLong(fileSize);
122      out.writeLong(startTime);
123      out.writeLong(endTime);
124    }
125
126    @Override
127    public String toString() {
128      return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
129    }
130  }
131
132  /**
133   * {@link RecordReader} for an {@link WAL} file. Implementation shared with deprecated
134   * HLogInputFormat.
135   */
136  static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> {
137    private Reader reader = null;
138    // visible until we can remove the deprecated HLogInputFormat
139    Entry currentEntry = new Entry();
140    private long startTime;
141    private long endTime;
142    private Configuration conf;
143    private Path logFile;
144    private long currentPos;
145
146    @Override
147    public void initialize(InputSplit split, TaskAttemptContext context)
148      throws IOException, InterruptedException {
149      WALSplit hsplit = (WALSplit) split;
150      logFile = new Path(hsplit.getLogFileName());
151      conf = context.getConfiguration();
152      LOG.info("Opening {} for {}", logFile, split);
153      openReader(logFile);
154      this.startTime = hsplit.getStartTime();
155      this.endTime = hsplit.getEndTime();
156    }
157
158    private void openReader(Path path) throws IOException {
159      closeReader();
160      reader = AbstractFSWALProvider.openReader(path, conf);
161      seek();
162      setCurrentPath(path);
163    }
164
165    private void setCurrentPath(Path path) {
166      this.logFile = path;
167    }
168
169    private void closeReader() throws IOException {
170      if (reader != null) {
171        reader.close();
172        reader = null;
173      }
174    }
175
176    private void seek() throws IOException {
177      if (currentPos != 0) {
178        reader.seek(currentPos);
179      }
180    }
181
182    @Override
183    public boolean nextKeyValue() throws IOException, InterruptedException {
184      if (reader == null) {
185        return false;
186      }
187      this.currentPos = reader.getPosition();
188      Entry temp;
189      long i = -1;
190      try {
191        do {
192          // skip older entries
193          try {
194            temp = reader.next(currentEntry);
195            i++;
196          } catch (EOFException x) {
197            LOG.warn("Corrupted entry detected. Ignoring the rest of the file."
198              + " (This is normal when a RegionServer crashed.)");
199            return false;
200          }
201        } while (temp != null && temp.getKey().getWriteTime() < startTime);
202
203        if (temp == null) {
204          if (i > 0) {
205            LOG.info("Skipped " + i + " entries.");
206          }
207          LOG.info("Reached end of file.");
208          return false;
209        } else if (i > 0) {
210          LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
211        }
212        boolean res = temp.getKey().getWriteTime() <= endTime;
213        if (!res) {
214          LOG.info(
215            "Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
216        }
217        return res;
218      } catch (IOException e) {
219        Path archivedLog = AbstractFSWALProvider.findArchivedLog(logFile, conf);
220        // archivedLog can be null if unable to locate in archiveDir.
221        if (archivedLog != null) {
222          openReader(archivedLog);
223          // Try call again in recursion
224          return nextKeyValue();
225        } else {
226          throw e;
227        }
228      }
229    }
230
231    @Override
232    public WALEdit getCurrentValue() throws IOException, InterruptedException {
233      return currentEntry.getEdit();
234    }
235
236    @Override
237    public float getProgress() throws IOException, InterruptedException {
238      // N/A depends on total number of entries, which is unknown
239      return 0;
240    }
241
242    @Override
243    public void close() throws IOException {
244      LOG.info("Closing reader");
245      if (reader != null) {
246        this.reader.close();
247      }
248    }
249  }
250
251  /**
252   * handler for non-deprecated WALKey version. fold into WALRecordReader once we no longer need to
253   * support HLogInputFormat.
254   */
255  static class WALKeyRecordReader extends WALRecordReader<WALKey> {
256    @Override
257    public WALKey getCurrentKey() throws IOException, InterruptedException {
258      return currentEntry.getKey();
259    }
260  }
261
262  @Override
263  public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
264    return getSplits(context, START_TIME_KEY, END_TIME_KEY);
265  }
266
267  /**
268   * implementation shared with deprecated HLogInputFormat
269   */
270  List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey)
271    throws IOException, InterruptedException {
272    Configuration conf = context.getConfiguration();
273    boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, false);
274    Path[] inputPaths = getInputPaths(conf);
275    long startTime = conf.getLong(startKey, Long.MIN_VALUE);
276    long endTime = conf.getLong(endKey, Long.MAX_VALUE);
277
278    List<FileStatus> allFiles = new ArrayList<FileStatus>();
279    for (Path inputPath : inputPaths) {
280      FileSystem fs = inputPath.getFileSystem(conf);
281      try {
282        List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime);
283        allFiles.addAll(files);
284      } catch (FileNotFoundException e) {
285        if (ignoreMissing) {
286          LOG.warn("File " + inputPath + " is missing. Skipping it.");
287          continue;
288        }
289        throw e;
290      }
291    }
292    List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
293    for (FileStatus file : allFiles) {
294      splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
295    }
296    return splits;
297  }
298
299  private Path[] getInputPaths(Configuration conf) {
300    String inpDirs = conf.get(FileInputFormat.INPUT_DIR);
301    return StringUtils
302      .stringToPath(inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ",")));
303  }
304
305  /**
306   * @param startTime If file looks like it has a timestamp in its name, we'll check if newer or
307   *                  equal to this value else we will filter out the file. If name does not seem to
308   *                  have a timestamp, we will just return it w/o filtering.
309   * @param endTime   If file looks like it has a timestamp in its name, we'll check if older or
310   *                  equal to this value else we will filter out the file. If name does not seem to
311   *                  have a timestamp, we will just return it w/o filtering.
312   */
313  private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
314    throws IOException {
315    List<FileStatus> result = new ArrayList<>();
316    LOG.debug("Scanning " + dir.toString() + " for WAL files");
317    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir);
318    if (!iter.hasNext()) {
319      return Collections.emptyList();
320    }
321    while (iter.hasNext()) {
322      LocatedFileStatus file = iter.next();
323      if (file.isDirectory()) {
324        // Recurse into sub directories
325        result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
326      } else {
327        addFile(result, file, startTime, endTime);
328      }
329    }
330    // TODO: These results should be sorted? Results could be content of recovered.edits directory
331    // -- null padded increasing numeric -- or a WAL file w/ timestamp suffix or timestamp and
332    // then meta suffix. See AbstractFSWALProvider#WALStartTimeComparator
333    return result;
334  }
335
336  static void addFile(List<FileStatus> result, LocatedFileStatus lfs, long startTime,
337    long endTime) {
338    long timestamp = WAL.getTimestamp(lfs.getPath().getName());
339    if (timestamp > 0) {
340      // Looks like a valid timestamp.
341      if (timestamp <= endTime && timestamp >= startTime) {
342        LOG.info("Found {}", lfs.getPath());
343        result.add(lfs);
344      } else {
345        LOG.info("Skipped {}, outside range [{}/{} - {}/{}]", lfs.getPath(), startTime,
346          Instant.ofEpochMilli(startTime), endTime, Instant.ofEpochMilli(endTime));
347      }
348    } else {
349      // If no timestamp, add it regardless.
350      LOG.info("Found (no-timestamp!) {}", lfs);
351      result.add(lfs);
352    }
353  }
354
355  @Override
356  public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split,
357    TaskAttemptContext context) throws IOException, InterruptedException {
358    return new WALKeyRecordReader();
359  }
360}