View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.EOFException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FileStatus;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.regionserver.wal.HLog;
35  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
36  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
37  import org.apache.hadoop.io.Writable;
38  import org.apache.hadoop.mapreduce.InputFormat;
39  import org.apache.hadoop.mapreduce.InputSplit;
40  import org.apache.hadoop.mapreduce.JobContext;
41  import org.apache.hadoop.mapreduce.RecordReader;
42  import org.apache.hadoop.mapreduce.TaskAttemptContext;
43  
44  /**
45   * Simple {@link InputFormat} for {@link HLog} files.
46   */
47  public class HLogInputFormat extends InputFormat<HLogKey, WALEdit> {
48    private static Log LOG = LogFactory.getLog(HLogInputFormat.class);
49  
50    public static String START_TIME_KEY = "hlog.start.time";
51    public static String END_TIME_KEY = "hlog.end.time";
52  
53    /**
54     * {@link InputSplit} for {@link HLog} files. Each split represent
55     * exactly one log file.
56     */
57    static class HLogSplit extends InputSplit implements Writable {
58      private String logFileName;
59      private long fileSize;
60      private long startTime;
61      private long endTime;
62  
63      /** for serialization */
64      public HLogSplit() {}
65  
66      /**
67       * Represent an HLogSplit, i.e. a single HLog file.
68       * Start- and EndTime are managed by the split, so that HLog files can be
69       * filtered before WALEdits are passed to the mapper(s).
70       * @param logFileName
71       * @param fileSize
72       * @param startTime
73       * @param endTime
74       */
75      public HLogSplit(String logFileName, long fileSize, long startTime, long endTime) {
76        this.logFileName = logFileName;
77        this.fileSize = fileSize;
78        this.startTime = startTime;
79        this.endTime = endTime;
80      }
81  
82      @Override
83      public long getLength() throws IOException, InterruptedException {
84        return fileSize;
85      }
86  
87      @Override
88      public String[] getLocations() throws IOException, InterruptedException {
89        // TODO: Find the data node with the most blocks for this HLog?
90        return new String[] {};
91      }
92  
93      public String getLogFileName() {
94        return logFileName;
95      }
96  
97      public long getStartTime() {
98        return startTime;
99      }
100 
101     public long getEndTime() {
102       return endTime;
103     }
104 
105     @Override
106     public void readFields(DataInput in) throws IOException {
107       logFileName = in.readUTF();
108       fileSize = in.readLong();
109       startTime = in.readLong();
110       endTime = in.readLong();
111     }
112 
113     @Override
114     public void write(DataOutput out) throws IOException {
115       out.writeUTF(logFileName);
116       out.writeLong(fileSize);
117       out.writeLong(startTime);
118       out.writeLong(endTime);
119     }
120 
121     @Override
122     public String toString() {
123       return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
124     }
125   }
126 
127   /**
128    * {@link RecordReader} for an {@link HLog} file.
129    */
130   static class HLogRecordReader extends RecordReader<HLogKey, WALEdit> {
131     private HLog.Reader reader = null;
132     private HLog.Entry currentEntry = new HLog.Entry();
133     private long startTime;
134     private long endTime;
135 
136     @Override
137     public void initialize(InputSplit split, TaskAttemptContext context)
138         throws IOException, InterruptedException {
139       HLogSplit hsplit = (HLogSplit)split;
140       Path logFile = new Path(hsplit.getLogFileName());
141       Configuration conf = context.getConfiguration();
142       LOG.info("Opening reader for "+split);
143       try {
144         this.reader = HLog.getReader(logFile.getFileSystem(conf), logFile, conf);
145       } catch (EOFException x) {
146         LOG.info("Ignoring corrupted HLog file: " + logFile
147             + " (This is normal when a RegionServer crashed.)");
148       }
149       this.startTime = hsplit.getStartTime();
150       this.endTime = hsplit.getEndTime();
151     }
152 
153     @Override
154     public boolean nextKeyValue() throws IOException, InterruptedException {
155       if (reader == null) return false;
156 
157       HLog.Entry temp;
158       long i = -1;
159       do {
160         // skip older entries
161         try {
162           temp = reader.next(currentEntry);
163           i++;
164         } catch (EOFException x) {
165           LOG.info("Corrupted entry detected. Ignoring the rest of the file."
166               + " (This is normal when a RegionServer crashed.)");
167           return false;
168         }
169       }
170       while(temp != null && temp.getKey().getWriteTime() < startTime);
171 
172       if (temp == null) {
173         if (i > 0) LOG.info("Skipped " + i + " entries.");
174         LOG.info("Reached end of file.");
175         return false;
176       } else if (i > 0) {
177         LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
178       }
179       boolean res = temp.getKey().getWriteTime() <= endTime;
180       if (!res) {
181         LOG.info("Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
182       }
183       return res;
184     }
185 
186     @Override
187     public HLogKey getCurrentKey() throws IOException, InterruptedException {
188       return currentEntry.getKey();
189     }
190 
191     @Override
192     public WALEdit getCurrentValue() throws IOException, InterruptedException {
193       return currentEntry.getEdit();
194     }
195 
196     @Override
197     public float getProgress() throws IOException, InterruptedException {
198       // N/A depends on total number of entries, which is unknown
199       return 0;
200     }
201 
202     @Override
203     public void close() throws IOException {
204       LOG.info("Closing reader");
205       if (reader != null) this.reader.close();
206     }
207   }
208 
209   @Override
210   public List<InputSplit> getSplits(JobContext context) throws IOException,
211       InterruptedException {
212     Configuration conf = context.getConfiguration();
213     Path inputDir = new Path(conf.get("mapred.input.dir"));
214 
215     long startTime = conf.getLong(START_TIME_KEY, Long.MIN_VALUE);
216     long endTime = conf.getLong(END_TIME_KEY, Long.MAX_VALUE);
217 
218     FileSystem fs = inputDir.getFileSystem(conf);
219     List<FileStatus> files = getFiles(fs, inputDir, startTime, endTime);
220 
221     List<InputSplit> splits = new ArrayList<InputSplit>(files.size());
222     for (FileStatus file : files) {
223       splits.add(new HLogSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
224     }
225     return splits;
226   }
227 
228   private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
229       throws IOException {
230     List<FileStatus> result = new ArrayList<FileStatus>();
231     LOG.debug("Scanning " + dir.toString() + " for HLog files");
232 
233     FileStatus[] files = fs.listStatus(dir);
234     if (files == null) return Collections.emptyList();
235     for (FileStatus file : files) {
236       if (file.isDir()) {
237         // recurse into sub directories
238         result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
239       } else {
240         String name = file.getPath().toString();
241         int idx = name.lastIndexOf('.');
242         if (idx > 0) {
243           try {
244             long fileStartTime = Long.parseLong(name.substring(idx+1));
245             if (fileStartTime <= endTime) {
246               LOG.info("Found: " + name);
247               result.add(file);
248             }
249           } catch (NumberFormatException x) {
250             idx = 0;
251           }
252         }
253         if (idx == 0) {
254           LOG.warn("File " + name + " does not appear to be an HLog file. Skipping...");
255         }
256       }
257     }
258     return result;
259   }
260 
261   @Override
262   public RecordReader<HLogKey, WALEdit> createRecordReader(InputSplit split,
263       TaskAttemptContext context) throws IOException, InterruptedException {
264     return new HLogRecordReader();
265   }
266 }