001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.EOFException;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.LocatedFileStatus;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.RemoteIterator;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038import org.apache.hadoop.hbase.wal.WALEdit;
039import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
040import org.apache.hadoop.hbase.wal.WAL;
041import org.apache.hadoop.hbase.wal.WAL.Entry;
042import org.apache.hadoop.hbase.wal.WAL.Reader;
043import org.apache.hadoop.hbase.wal.WALKey;
044import org.apache.hadoop.io.Writable;
045import org.apache.hadoop.mapreduce.InputFormat;
046import org.apache.hadoop.mapreduce.InputSplit;
047import org.apache.hadoop.mapreduce.JobContext;
048import org.apache.hadoop.mapreduce.RecordReader;
049import org.apache.hadoop.mapreduce.TaskAttemptContext;
050import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
051import org.apache.hadoop.util.StringUtils;
052
053/**
054 * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files.
055 */
056@InterfaceAudience.Public
057public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
058  private static final Logger LOG = LoggerFactory.getLogger(WALInputFormat.class);
059
060  public static final String START_TIME_KEY = "wal.start.time";
061  public static final String END_TIME_KEY = "wal.end.time";
062
063  /**
064   * {@link InputSplit} for {@link WAL} files. Each split represent
065   * exactly one log file.
066   */
067  static class WALSplit extends InputSplit implements Writable {
068    private String logFileName;
069    private long fileSize;
070    private long startTime;
071    private long endTime;
072
073    /** for serialization */
074    public WALSplit() {}
075
076    /**
077     * Represent an WALSplit, i.e. a single WAL file.
078     * Start- and EndTime are managed by the split, so that WAL files can be
079     * filtered before WALEdits are passed to the mapper(s).
080     * @param logFileName
081     * @param fileSize
082     * @param startTime
083     * @param endTime
084     */
085    public WALSplit(String logFileName, long fileSize, long startTime, long endTime) {
086      this.logFileName = logFileName;
087      this.fileSize = fileSize;
088      this.startTime = startTime;
089      this.endTime = endTime;
090    }
091
092    @Override
093    public long getLength() throws IOException, InterruptedException {
094      return fileSize;
095    }
096
097    @Override
098    public String[] getLocations() throws IOException, InterruptedException {
099      // TODO: Find the data node with the most blocks for this WAL?
100      return new String[] {};
101    }
102
103    public String getLogFileName() {
104      return logFileName;
105    }
106
107    public long getStartTime() {
108      return startTime;
109    }
110
111    public long getEndTime() {
112      return endTime;
113    }
114
115    @Override
116    public void readFields(DataInput in) throws IOException {
117      logFileName = in.readUTF();
118      fileSize = in.readLong();
119      startTime = in.readLong();
120      endTime = in.readLong();
121    }
122
123    @Override
124    public void write(DataOutput out) throws IOException {
125      out.writeUTF(logFileName);
126      out.writeLong(fileSize);
127      out.writeLong(startTime);
128      out.writeLong(endTime);
129    }
130
131    @Override
132    public String toString() {
133      return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
134    }
135  }
136
137  /**
138   * {@link RecordReader} for an {@link WAL} file.
139   * Implementation shared with deprecated HLogInputFormat.
140   */
141  static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> {
142    private Reader reader = null;
143    // visible until we can remove the deprecated HLogInputFormat
144    Entry currentEntry = new Entry();
145    private long startTime;
146    private long endTime;
147    private Configuration conf;
148    private Path logFile;
149    private long currentPos;
150
151    @Override
152    public void initialize(InputSplit split, TaskAttemptContext context)
153        throws IOException, InterruptedException {
154      WALSplit hsplit = (WALSplit)split;
155      logFile = new Path(hsplit.getLogFileName());
156      conf = context.getConfiguration();
157      LOG.info("Opening reader for "+split);
158      openReader(logFile);
159      this.startTime = hsplit.getStartTime();
160      this.endTime = hsplit.getEndTime();
161    }
162
163    private void openReader(Path path) throws IOException
164    {
165      closeReader();
166      reader = AbstractFSWALProvider.openReader(path, conf);
167      seek();
168      setCurrentPath(path);
169    }
170
171    private void setCurrentPath(Path path) {
172      this.logFile = path;
173    }
174
175    private void closeReader() throws IOException {
176      if (reader != null) {
177        reader.close();
178        reader = null;
179      }
180    }
181
182    private void seek() throws IOException {
183      if (currentPos != 0) {
184        reader.seek(currentPos);
185      }
186    }
187
188    @Override
189    public boolean nextKeyValue() throws IOException, InterruptedException {
190      if (reader == null) return false;
191      this.currentPos = reader.getPosition();
192      Entry temp;
193      long i = -1;
194      try {
195        do {
196          // skip older entries
197          try {
198            temp = reader.next(currentEntry);
199            i++;
200          } catch (EOFException x) {
201            LOG.warn("Corrupted entry detected. Ignoring the rest of the file."
202                + " (This is normal when a RegionServer crashed.)");
203            return false;
204          }
205        } while (temp != null && temp.getKey().getWriteTime() < startTime);
206
207        if (temp == null) {
208          if (i > 0) LOG.info("Skipped " + i + " entries.");
209          LOG.info("Reached end of file.");
210          return false;
211        } else if (i > 0) {
212          LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
213        }
214        boolean res = temp.getKey().getWriteTime() <= endTime;
215        if (!res) {
216          LOG.info("Reached ts: " + temp.getKey().getWriteTime()
217              + " ignoring the rest of the file.");
218        }
219        return res;
220      } catch (IOException e) {
221        Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf);
222        if (logFile != archivedLog) {
223          openReader(archivedLog);
224          // Try call again in recursion
225          return nextKeyValue();
226        } else {
227          throw e;
228        }
229      }
230    }
231
232    @Override
233    public WALEdit getCurrentValue() throws IOException, InterruptedException {
234      return currentEntry.getEdit();
235    }
236
237    @Override
238    public float getProgress() throws IOException, InterruptedException {
239      // N/A depends on total number of entries, which is unknown
240      return 0;
241    }
242
243    @Override
244    public void close() throws IOException {
245      LOG.info("Closing reader");
246      if (reader != null) this.reader.close();
247    }
248  }
249
250  /**
251   * handler for non-deprecated WALKey version. fold into WALRecordReader once we no longer
252   * need to support HLogInputFormat.
253   */
254  static class WALKeyRecordReader extends WALRecordReader<WALKey> {
255    @Override
256    public WALKey getCurrentKey() throws IOException, InterruptedException {
257      return currentEntry.getKey();
258    }
259  }
260
261  @Override
262  public List<InputSplit> getSplits(JobContext context) throws IOException,
263      InterruptedException {
264    return getSplits(context, START_TIME_KEY, END_TIME_KEY);
265  }
266
267  /**
268   * implementation shared with deprecated HLogInputFormat
269   */
270  List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey)
271      throws IOException, InterruptedException {
272    Configuration conf = context.getConfiguration();
273    boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, false);
274    Path[] inputPaths = getInputPaths(conf);
275    long startTime = conf.getLong(startKey, Long.MIN_VALUE);
276    long endTime = conf.getLong(endKey, Long.MAX_VALUE);
277
278    List<FileStatus> allFiles = new ArrayList<FileStatus>();
279    for(Path inputPath: inputPaths){
280      FileSystem fs = inputPath.getFileSystem(conf);
281      try {
282        List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime);
283        allFiles.addAll(files);
284      } catch (FileNotFoundException e) {
285        if (ignoreMissing) {
286          LOG.warn("File "+ inputPath +" is missing. Skipping it.");
287          continue;
288        }
289        throw e;
290      }
291    }
292    List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
293    for (FileStatus file : allFiles) {
294      splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
295    }
296    return splits;
297  }
298
299  private Path[] getInputPaths(Configuration conf) {
300    String inpDirs = conf.get(FileInputFormat.INPUT_DIR);
301    return StringUtils.stringToPath(
302      inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ",")));
303  }
304
305  private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
306      throws IOException {
307    List<FileStatus> result = new ArrayList<>();
308    LOG.debug("Scanning " + dir.toString() + " for WAL files");
309
310    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir);
311    if (!iter.hasNext()) return Collections.emptyList();
312    while (iter.hasNext()) {
313      LocatedFileStatus file = iter.next();
314      if (file.isDirectory()) {
315        // recurse into sub directories
316        result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
317      } else {
318        String name = file.getPath().toString();
319        int idx = name.lastIndexOf('.');
320        if (idx > 0) {
321          try {
322            long fileStartTime = Long.parseLong(name.substring(idx+1));
323            if (fileStartTime <= endTime) {
324              LOG.info("Found: " + file);
325              result.add(file);
326            }
327          } catch (NumberFormatException x) {
328            idx = 0;
329          }
330        }
331        if (idx == 0) {
332          LOG.warn("File " + name + " does not appear to be an WAL file. Skipping...");
333        }
334      }
335    }
336    return result;
337  }
338
339  @Override
340  public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split,
341      TaskAttemptContext context) throws IOException, InterruptedException {
342    return new WALKeyRecordReader();
343  }
344}