001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.EOFException;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.time.Instant;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.LocatedFileStatus;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.RemoteIterator;
035import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
036import org.apache.hadoop.hbase.wal.WAL;
037import org.apache.hadoop.hbase.wal.WAL.Entry;
038import org.apache.hadoop.hbase.wal.WAL.Reader;
039import org.apache.hadoop.hbase.wal.WALEdit;
040import org.apache.hadoop.hbase.wal.WALKey;
041import org.apache.hadoop.io.Writable;
042import org.apache.hadoop.mapreduce.InputFormat;
043import org.apache.hadoop.mapreduce.InputSplit;
044import org.apache.hadoop.mapreduce.JobContext;
045import org.apache.hadoop.mapreduce.RecordReader;
046import org.apache.hadoop.mapreduce.TaskAttemptContext;
047import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
048import org.apache.hadoop.mapreduce.security.TokenCache;
049import org.apache.hadoop.util.StringUtils;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files.
056 */
057@InterfaceAudience.Public
058public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
059  private static final Logger LOG = LoggerFactory.getLogger(WALInputFormat.class);
060
061  public static final String START_TIME_KEY = "wal.start.time";
062  public static final String END_TIME_KEY = "wal.end.time";
063
064  /**
065   * {@link InputSplit} for {@link WAL} files. Each split represent exactly one log file.
066   */
067  static class WALSplit extends InputSplit implements Writable {
068    private String logFileName;
069    private long fileSize;
070    private long startTime;
071    private long endTime;
072
073    /** for serialization */
074    public WALSplit() {
075    }
076
077    /**
078     * Represent an WALSplit, i.e. a single WAL file. Start- and EndTime are managed by the split,
079     * so that WAL files can be filtered before WALEdits are passed to the mapper(s).
080     */
081    public WALSplit(String logFileName, long fileSize, long startTime, long endTime) {
082      this.logFileName = logFileName;
083      this.fileSize = fileSize;
084      this.startTime = startTime;
085      this.endTime = endTime;
086    }
087
088    @Override
089    public long getLength() throws IOException, InterruptedException {
090      return fileSize;
091    }
092
093    @Override
094    public String[] getLocations() throws IOException, InterruptedException {
095      // TODO: Find the data node with the most blocks for this WAL?
096      return new String[] {};
097    }
098
099    public String getLogFileName() {
100      return logFileName;
101    }
102
103    public long getStartTime() {
104      return startTime;
105    }
106
107    public long getEndTime() {
108      return endTime;
109    }
110
111    @Override
112    public void readFields(DataInput in) throws IOException {
113      logFileName = in.readUTF();
114      fileSize = in.readLong();
115      startTime = in.readLong();
116      endTime = in.readLong();
117    }
118
119    @Override
120    public void write(DataOutput out) throws IOException {
121      out.writeUTF(logFileName);
122      out.writeLong(fileSize);
123      out.writeLong(startTime);
124      out.writeLong(endTime);
125    }
126
127    @Override
128    public String toString() {
129      return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
130    }
131  }
132
133  /**
134   * {@link RecordReader} for an {@link WAL} file. Implementation shared with deprecated
135   * HLogInputFormat.
136   */
137  static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> {
138    private Reader reader = null;
139    // visible until we can remove the deprecated HLogInputFormat
140    Entry currentEntry = new Entry();
141    private long startTime;
142    private long endTime;
143    private Configuration conf;
144    private Path logFile;
145    private long currentPos;
146
147    @Override
148    public void initialize(InputSplit split, TaskAttemptContext context)
149      throws IOException, InterruptedException {
150      WALSplit hsplit = (WALSplit) split;
151      logFile = new Path(hsplit.getLogFileName());
152      conf = context.getConfiguration();
153      LOG.info("Opening {} for {}", logFile, split);
154      openReader(logFile);
155      this.startTime = hsplit.getStartTime();
156      this.endTime = hsplit.getEndTime();
157    }
158
159    private void openReader(Path path) throws IOException {
160      closeReader();
161      reader = AbstractFSWALProvider.openReader(path, conf);
162      seek();
163      setCurrentPath(path);
164    }
165
166    private void setCurrentPath(Path path) {
167      this.logFile = path;
168    }
169
170    private void closeReader() throws IOException {
171      if (reader != null) {
172        reader.close();
173        reader = null;
174      }
175    }
176
177    private void seek() throws IOException {
178      if (currentPos != 0) {
179        reader.seek(currentPos);
180      }
181    }
182
183    @Override
184    public boolean nextKeyValue() throws IOException, InterruptedException {
185      if (reader == null) {
186        return false;
187      }
188      this.currentPos = reader.getPosition();
189      Entry temp;
190      long i = -1;
191      try {
192        do {
193          // skip older entries
194          try {
195            temp = reader.next(currentEntry);
196            i++;
197          } catch (EOFException x) {
198            LOG.warn("Corrupted entry detected. Ignoring the rest of the file."
199              + " (This is normal when a RegionServer crashed.)");
200            return false;
201          }
202        } while (temp != null && temp.getKey().getWriteTime() < startTime);
203
204        if (temp == null) {
205          if (i > 0) {
206            LOG.info("Skipped " + i + " entries.");
207          }
208          LOG.info("Reached end of file.");
209          return false;
210        } else if (i > 0) {
211          LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
212        }
213        boolean res = temp.getKey().getWriteTime() <= endTime;
214        if (!res) {
215          LOG.info(
216            "Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
217        }
218        return res;
219      } catch (IOException e) {
220        Path archivedLog = AbstractFSWALProvider.findArchivedLog(logFile, conf);
221        // archivedLog can be null if unable to locate in archiveDir.
222        if (archivedLog != null) {
223          openReader(archivedLog);
224          // Try call again in recursion
225          return nextKeyValue();
226        } else {
227          throw e;
228        }
229      }
230    }
231
232    @Override
233    public WALEdit getCurrentValue() throws IOException, InterruptedException {
234      return currentEntry.getEdit();
235    }
236
237    @Override
238    public float getProgress() throws IOException, InterruptedException {
239      // N/A depends on total number of entries, which is unknown
240      return 0;
241    }
242
243    @Override
244    public void close() throws IOException {
245      LOG.info("Closing reader");
246      if (reader != null) {
247        this.reader.close();
248      }
249    }
250  }
251
252  /**
253   * handler for non-deprecated WALKey version. fold into WALRecordReader once we no longer need to
254   * support HLogInputFormat.
255   */
256  static class WALKeyRecordReader extends WALRecordReader<WALKey> {
257    @Override
258    public WALKey getCurrentKey() throws IOException, InterruptedException {
259      return currentEntry.getKey();
260    }
261  }
262
263  @Override
264  public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
265    return getSplits(context, START_TIME_KEY, END_TIME_KEY);
266  }
267
268  /**
269   * implementation shared with deprecated HLogInputFormat
270   */
271  List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey)
272    throws IOException, InterruptedException {
273    Configuration conf = context.getConfiguration();
274    boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, false);
275    Path[] inputPaths = getInputPaths(conf);
276    // get delegation token for the filesystem
277    TokenCache.obtainTokensForNamenodes(context.getCredentials(), inputPaths, conf);
278    long startTime = conf.getLong(startKey, Long.MIN_VALUE);
279    long endTime = conf.getLong(endKey, Long.MAX_VALUE);
280
281    List<FileStatus> allFiles = new ArrayList<FileStatus>();
282    for (Path inputPath : inputPaths) {
283      FileSystem fs = inputPath.getFileSystem(conf);
284      try {
285        List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime);
286        allFiles.addAll(files);
287      } catch (FileNotFoundException e) {
288        if (ignoreMissing) {
289          LOG.warn("File " + inputPath + " is missing. Skipping it.");
290          continue;
291        }
292        throw e;
293      }
294    }
295    List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
296    for (FileStatus file : allFiles) {
297      splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
298    }
299    return splits;
300  }
301
302  private Path[] getInputPaths(Configuration conf) {
303    String inpDirs = conf.get(FileInputFormat.INPUT_DIR);
304    return StringUtils
305      .stringToPath(inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ",")));
306  }
307
308  /**
309   * @param startTime If file looks like it has a timestamp in its name, we'll check if newer or
310   *                  equal to this value else we will filter out the file. If name does not seem to
311   *                  have a timestamp, we will just return it w/o filtering.
312   * @param endTime   If file looks like it has a timestamp in its name, we'll check if older or
313   *                  equal to this value else we will filter out the file. If name does not seem to
314   *                  have a timestamp, we will just return it w/o filtering.
315   */
316  private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
317    throws IOException {
318    List<FileStatus> result = new ArrayList<>();
319    LOG.debug("Scanning " + dir.toString() + " for WAL files");
320    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir);
321    if (!iter.hasNext()) {
322      return Collections.emptyList();
323    }
324    while (iter.hasNext()) {
325      LocatedFileStatus file = iter.next();
326      if (file.isDirectory()) {
327        // Recurse into sub directories
328        result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
329      } else {
330        addFile(result, file, startTime, endTime);
331      }
332    }
333    // TODO: These results should be sorted? Results could be content of recovered.edits directory
334    // -- null padded increasing numeric -- or a WAL file w/ timestamp suffix or timestamp and
335    // then meta suffix. See AbstractFSWALProvider#WALStartTimeComparator
336    return result;
337  }
338
339  static void addFile(List<FileStatus> result, LocatedFileStatus lfs, long startTime,
340    long endTime) {
341    long timestamp = AbstractFSWALProvider.getTimestamp(lfs.getPath().getName());
342    if (timestamp > 0) {
343      // Looks like a valid timestamp.
344      if (timestamp <= endTime && timestamp >= startTime) {
345        LOG.info("Found {}", lfs.getPath());
346        result.add(lfs);
347      } else {
348        LOG.info("Skipped {}, outside range [{}/{} - {}/{}]", lfs.getPath(), startTime,
349          Instant.ofEpochMilli(startTime), endTime, Instant.ofEpochMilli(endTime));
350      }
351    } else {
352      // If no timestamp, add it regardless.
353      LOG.info("Found (no-timestamp!) {}", lfs);
354      result.add(lfs);
355    }
356  }
357
358  @Override
359  public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split,
360    TaskAttemptContext context) throws IOException, InterruptedException {
361    return new WALKeyRecordReader();
362  }
363}