View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.EOFException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FileStatus;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.regionserver.wal.HLog;
36  import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
37  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
38  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
39  import org.apache.hadoop.io.Writable;
40  import org.apache.hadoop.mapreduce.InputFormat;
41  import org.apache.hadoop.mapreduce.InputSplit;
42  import org.apache.hadoop.mapreduce.JobContext;
43  import org.apache.hadoop.mapreduce.RecordReader;
44  import org.apache.hadoop.mapreduce.TaskAttemptContext;
45  
46  /**
47   * Simple {@link InputFormat} for {@link HLog} files.
48   */
49  @InterfaceAudience.Public
50  public class HLogInputFormat extends InputFormat<HLogKey, WALEdit> {
51    private static final Log LOG = LogFactory.getLog(HLogInputFormat.class);
52  
53    public static final String START_TIME_KEY = "hlog.start.time";
54    public static final String END_TIME_KEY = "hlog.end.time";
55  
56    /**
57     * {@link InputSplit} for {@link HLog} files. Each split represent
58     * exactly one log file.
59     */
60    static class HLogSplit extends InputSplit implements Writable {
61      private String logFileName;
62      private long fileSize;
63      private long startTime;
64      private long endTime;
65  
66      /** for serialization */
67      public HLogSplit() {}
68  
69      /**
70       * Represent an HLogSplit, i.e. a single HLog file.
71       * Start- and EndTime are managed by the split, so that HLog files can be
72       * filtered before WALEdits are passed to the mapper(s).
73       * @param logFileName
74       * @param fileSize
75       * @param startTime
76       * @param endTime
77       */
78      public HLogSplit(String logFileName, long fileSize, long startTime, long endTime) {
79        this.logFileName = logFileName;
80        this.fileSize = fileSize;
81        this.startTime = startTime;
82        this.endTime = endTime;
83      }
84  
85      @Override
86      public long getLength() throws IOException, InterruptedException {
87        return fileSize;
88      }
89  
90      @Override
91      public String[] getLocations() throws IOException, InterruptedException {
92        // TODO: Find the data node with the most blocks for this HLog?
93        return new String[] {};
94      }
95  
96      public String getLogFileName() {
97        return logFileName;
98      }
99  
100     public long getStartTime() {
101       return startTime;
102     }
103 
104     public long getEndTime() {
105       return endTime;
106     }
107 
108     @Override
109     public void readFields(DataInput in) throws IOException {
110       logFileName = in.readUTF();
111       fileSize = in.readLong();
112       startTime = in.readLong();
113       endTime = in.readLong();
114     }
115 
116     @Override
117     public void write(DataOutput out) throws IOException {
118       out.writeUTF(logFileName);
119       out.writeLong(fileSize);
120       out.writeLong(startTime);
121       out.writeLong(endTime);
122     }
123 
124     @Override
125     public String toString() {
126       return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
127     }
128   }
129 
130   /**
131    * {@link RecordReader} for an {@link HLog} file.
132    */
133   static class HLogRecordReader extends RecordReader<HLogKey, WALEdit> {
134     private HLog.Reader reader = null;
135     private HLog.Entry currentEntry = new HLog.Entry();
136     private long startTime;
137     private long endTime;
138 
139     @Override
140     public void initialize(InputSplit split, TaskAttemptContext context)
141         throws IOException, InterruptedException {
142       HLogSplit hsplit = (HLogSplit)split;
143       Path logFile = new Path(hsplit.getLogFileName());
144       Configuration conf = context.getConfiguration();
145       LOG.info("Opening reader for "+split);
146       try {
147         this.reader = HLogFactory.createReader(logFile.getFileSystem(conf), 
148             logFile, conf);
149       } catch (EOFException x) {
150         LOG.info("Ignoring corrupted HLog file: " + logFile
151             + " (This is normal when a RegionServer crashed.)");
152       }
153       this.startTime = hsplit.getStartTime();
154       this.endTime = hsplit.getEndTime();
155     }
156 
157     @Override
158     public boolean nextKeyValue() throws IOException, InterruptedException {
159       if (reader == null) return false;
160 
161       HLog.Entry temp;
162       long i = -1;
163       do {
164         // skip older entries
165         try {
166           temp = reader.next(currentEntry);
167           i++;
168         } catch (EOFException x) {
169           LOG.info("Corrupted entry detected. Ignoring the rest of the file."
170               + " (This is normal when a RegionServer crashed.)");
171           return false;
172         }
173       }
174       while(temp != null && temp.getKey().getWriteTime() < startTime);
175 
176       if (temp == null) {
177         if (i > 0) LOG.info("Skipped " + i + " entries.");
178         LOG.info("Reached end of file.");
179         return false;
180       } else if (i > 0) {
181         LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
182       }
183       boolean res = temp.getKey().getWriteTime() <= endTime;
184       if (!res) {
185         LOG.info("Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
186       }
187       return res;
188     }
189 
190     @Override
191     public HLogKey getCurrentKey() throws IOException, InterruptedException {
192       return currentEntry.getKey();
193     }
194 
195     @Override
196     public WALEdit getCurrentValue() throws IOException, InterruptedException {
197       return currentEntry.getEdit();
198     }
199 
200     @Override
201     public float getProgress() throws IOException, InterruptedException {
202       // N/A depends on total number of entries, which is unknown
203       return 0;
204     }
205 
206     @Override
207     public void close() throws IOException {
208       LOG.info("Closing reader");
209       if (reader != null) this.reader.close();
210     }
211   }
212 
213   @Override
214   public List<InputSplit> getSplits(JobContext context) throws IOException,
215       InterruptedException {
216     Configuration conf = context.getConfiguration();
217     Path inputDir = new Path(conf.get("mapreduce.input.fileinputformat.inputdir"));
218 
219     long startTime = conf.getLong(START_TIME_KEY, Long.MIN_VALUE);
220     long endTime = conf.getLong(END_TIME_KEY, Long.MAX_VALUE);
221 
222     FileSystem fs = inputDir.getFileSystem(conf);
223     List<FileStatus> files = getFiles(fs, inputDir, startTime, endTime);
224 
225     List<InputSplit> splits = new ArrayList<InputSplit>(files.size());
226     for (FileStatus file : files) {
227       splits.add(new HLogSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
228     }
229     return splits;
230   }
231 
232   private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
233       throws IOException {
234     List<FileStatus> result = new ArrayList<FileStatus>();
235     LOG.debug("Scanning " + dir.toString() + " for HLog files");
236 
237     FileStatus[] files = fs.listStatus(dir);
238     if (files == null) return Collections.emptyList();
239     for (FileStatus file : files) {
240       if (file.isDirectory()) {
241         // recurse into sub directories
242         result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
243       } else {
244         String name = file.getPath().toString();
245         int idx = name.lastIndexOf('.');
246         if (idx > 0) {
247           try {
248             long fileStartTime = Long.parseLong(name.substring(idx+1));
249             if (fileStartTime <= endTime) {
250               LOG.info("Found: " + name);
251               result.add(file);
252             }
253           } catch (NumberFormatException x) {
254             idx = 0;
255           }
256         }
257         if (idx == 0) {
258           LOG.warn("File " + name + " does not appear to be an HLog file. Skipping...");
259         }
260       }
261     }
262     return result;
263   }
264 
265   @Override
266   public RecordReader<HLogKey, WALEdit> createRecordReader(InputSplit split,
267       TaskAttemptContext context) throws IOException, InterruptedException {
268     return new HLogRecordReader();
269   }
270 }