View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.EOFException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FileStatus;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
36  import org.apache.hadoop.hbase.wal.WALFactory;
37  import org.apache.hadoop.hbase.wal.WALKey;
38  import org.apache.hadoop.hbase.wal.WAL.Entry;
39  import org.apache.hadoop.hbase.wal.WAL.Reader;
40  import org.apache.hadoop.io.Writable;
41  import org.apache.hadoop.mapreduce.InputFormat;
42  import org.apache.hadoop.mapreduce.InputSplit;
43  import org.apache.hadoop.mapreduce.JobContext;
44  import org.apache.hadoop.mapreduce.RecordReader;
45  import org.apache.hadoop.mapreduce.TaskAttemptContext;
46  
47  /**
48   * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files.
49   */
50  @InterfaceAudience.Public
51  public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
52    private static final Log LOG = LogFactory.getLog(WALInputFormat.class);
53  
54    public static final String START_TIME_KEY = "wal.start.time";
55    public static final String END_TIME_KEY = "wal.end.time";
56  
57    /**
58     * {@link InputSplit} for {@link WAL} files. Each split represent
59     * exactly one log file.
60     */
61    static class WALSplit extends InputSplit implements Writable {
62      private String logFileName;
63      private long fileSize;
64      private long startTime;
65      private long endTime;
66  
67      /** for serialization */
68      public WALSplit() {}
69  
70      /**
71       * Represent an WALSplit, i.e. a single WAL file.
72       * Start- and EndTime are managed by the split, so that WAL files can be
73       * filtered before WALEdits are passed to the mapper(s).
74       * @param logFileName
75       * @param fileSize
76       * @param startTime
77       * @param endTime
78       */
79      public WALSplit(String logFileName, long fileSize, long startTime, long endTime) {
80        this.logFileName = logFileName;
81        this.fileSize = fileSize;
82        this.startTime = startTime;
83        this.endTime = endTime;
84      }
85  
86      @Override
87      public long getLength() throws IOException, InterruptedException {
88        return fileSize;
89      }
90  
91      @Override
92      public String[] getLocations() throws IOException, InterruptedException {
93        // TODO: Find the data node with the most blocks for this WAL?
94        return new String[] {};
95      }
96  
97      public String getLogFileName() {
98        return logFileName;
99      }
100 
101     public long getStartTime() {
102       return startTime;
103     }
104 
105     public long getEndTime() {
106       return endTime;
107     }
108 
109     @Override
110     public void readFields(DataInput in) throws IOException {
111       logFileName = in.readUTF();
112       fileSize = in.readLong();
113       startTime = in.readLong();
114       endTime = in.readLong();
115     }
116 
117     @Override
118     public void write(DataOutput out) throws IOException {
119       out.writeUTF(logFileName);
120       out.writeLong(fileSize);
121       out.writeLong(startTime);
122       out.writeLong(endTime);
123     }
124 
125     @Override
126     public String toString() {
127       return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
128     }
129   }
130 
131   /**
132    * {@link RecordReader} for an {@link WAL} file.
133    * Implementation shared with deprecated HLogInputFormat.
134    */
135   static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> {
136     private Reader reader = null;
137     // visible until we can remove the deprecated HLogInputFormat
138     Entry currentEntry = new Entry();
139     private long startTime;
140     private long endTime;
141 
142     @Override
143     public void initialize(InputSplit split, TaskAttemptContext context)
144         throws IOException, InterruptedException {
145       WALSplit hsplit = (WALSplit)split;
146       Path logFile = new Path(hsplit.getLogFileName());
147       Configuration conf = context.getConfiguration();
148       LOG.info("Opening reader for "+split);
149       try {
150         this.reader = WALFactory.createReader(logFile.getFileSystem(conf), logFile, conf);
151       } catch (EOFException x) {
152         LOG.info("Ignoring corrupted WAL file: " + logFile
153             + " (This is normal when a RegionServer crashed.)");
154         this.reader = null;
155       }
156       this.startTime = hsplit.getStartTime();
157       this.endTime = hsplit.getEndTime();
158     }
159 
160     @Override
161     public boolean nextKeyValue() throws IOException, InterruptedException {
162       if (reader == null) return false;
163 
164       Entry temp;
165       long i = -1;
166       do {
167         // skip older entries
168         try {
169           temp = reader.next(currentEntry);
170           i++;
171         } catch (EOFException x) {
172           LOG.info("Corrupted entry detected. Ignoring the rest of the file."
173               + " (This is normal when a RegionServer crashed.)");
174           return false;
175         }
176       }
177       while(temp != null && temp.getKey().getWriteTime() < startTime);
178 
179       if (temp == null) {
180         if (i > 0) LOG.info("Skipped " + i + " entries.");
181         LOG.info("Reached end of file.");
182         return false;
183       } else if (i > 0) {
184         LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
185       }
186       boolean res = temp.getKey().getWriteTime() <= endTime;
187       if (!res) {
188         LOG.info("Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
189       }
190       return res;
191     }
192 
193     @Override
194     public WALEdit getCurrentValue() throws IOException, InterruptedException {
195       return currentEntry.getEdit();
196     }
197 
198     @Override
199     public float getProgress() throws IOException, InterruptedException {
200       // N/A depends on total number of entries, which is unknown
201       return 0;
202     }
203 
204     @Override
205     public void close() throws IOException {
206       LOG.info("Closing reader");
207       if (reader != null) this.reader.close();
208     }
209   }
210 
211   /**
212    * handler for non-deprecated WALKey version. fold into WALRecordReader once we no longer
213    * need to support HLogInputFormat.
214    */
215   static class WALKeyRecordReader extends WALRecordReader<WALKey> {
216     @Override
217     public WALKey getCurrentKey() throws IOException, InterruptedException {
218       return currentEntry.getKey();
219     }
220   }
221 
222   @Override
223   public List<InputSplit> getSplits(JobContext context) throws IOException,
224       InterruptedException {
225     return getSplits(context, START_TIME_KEY, END_TIME_KEY);
226   }
227 
228   /**
229    * implementation shared with deprecated HLogInputFormat
230    */
231   List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey)
232       throws IOException, InterruptedException {
233     Configuration conf = context.getConfiguration();
234     Path inputDir = new Path(conf.get("mapreduce.input.fileinputformat.inputdir"));
235 
236     long startTime = conf.getLong(startKey, Long.MIN_VALUE);
237     long endTime = conf.getLong(endKey, Long.MAX_VALUE);
238 
239     FileSystem fs = inputDir.getFileSystem(conf);
240     List<FileStatus> files = getFiles(fs, inputDir, startTime, endTime);
241 
242     List<InputSplit> splits = new ArrayList<InputSplit>(files.size());
243     for (FileStatus file : files) {
244       splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
245     }
246     return splits;
247   }
248 
249   private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
250       throws IOException {
251     List<FileStatus> result = new ArrayList<FileStatus>();
252     LOG.debug("Scanning " + dir.toString() + " for WAL files");
253 
254     FileStatus[] files = fs.listStatus(dir);
255     if (files == null) return Collections.emptyList();
256     for (FileStatus file : files) {
257       if (file.isDirectory()) {
258         // recurse into sub directories
259         result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
260       } else {
261         String name = file.getPath().toString();
262         int idx = name.lastIndexOf('.');
263         if (idx > 0) {
264           try {
265             long fileStartTime = Long.parseLong(name.substring(idx+1));
266             if (fileStartTime <= endTime) {
267               LOG.info("Found: " + name);
268               result.add(file);
269             }
270           } catch (NumberFormatException x) {
271             idx = 0;
272           }
273         }
274         if (idx == 0) {
275           LOG.warn("File " + name + " does not appear to be an WAL file. Skipping...");
276         }
277       }
278     }
279     return result;
280   }
281 
282   @Override
283   public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split,
284       TaskAttemptContext context) throws IOException, InterruptedException {
285     return new WALKeyRecordReader();
286   }
287 }