1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.EOFException;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FileStatus;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
36 import org.apache.hadoop.hbase.wal.WALFactory;
37 import org.apache.hadoop.hbase.wal.WALKey;
38 import org.apache.hadoop.hbase.wal.WAL.Entry;
39 import org.apache.hadoop.hbase.wal.WAL.Reader;
40 import org.apache.hadoop.io.Writable;
41 import org.apache.hadoop.mapreduce.InputFormat;
42 import org.apache.hadoop.mapreduce.InputSplit;
43 import org.apache.hadoop.mapreduce.JobContext;
44 import org.apache.hadoop.mapreduce.RecordReader;
45 import org.apache.hadoop.mapreduce.TaskAttemptContext;
46
47
48
49
50 @InterfaceAudience.Public
51 public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
52 private static final Log LOG = LogFactory.getLog(WALInputFormat.class);
53
54 public static final String START_TIME_KEY = "wal.start.time";
55 public static final String END_TIME_KEY = "wal.end.time";
56
57
58
59
60
61 static class WALSplit extends InputSplit implements Writable {
62 private String logFileName;
63 private long fileSize;
64 private long startTime;
65 private long endTime;
66
67
68 public WALSplit() {}
69
70
71
72
73
74
75
76
77
78
79 public WALSplit(String logFileName, long fileSize, long startTime, long endTime) {
80 this.logFileName = logFileName;
81 this.fileSize = fileSize;
82 this.startTime = startTime;
83 this.endTime = endTime;
84 }
85
86 @Override
87 public long getLength() throws IOException, InterruptedException {
88 return fileSize;
89 }
90
91 @Override
92 public String[] getLocations() throws IOException, InterruptedException {
93
94 return new String[] {};
95 }
96
97 public String getLogFileName() {
98 return logFileName;
99 }
100
101 public long getStartTime() {
102 return startTime;
103 }
104
105 public long getEndTime() {
106 return endTime;
107 }
108
109 @Override
110 public void readFields(DataInput in) throws IOException {
111 logFileName = in.readUTF();
112 fileSize = in.readLong();
113 startTime = in.readLong();
114 endTime = in.readLong();
115 }
116
117 @Override
118 public void write(DataOutput out) throws IOException {
119 out.writeUTF(logFileName);
120 out.writeLong(fileSize);
121 out.writeLong(startTime);
122 out.writeLong(endTime);
123 }
124
125 @Override
126 public String toString() {
127 return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
128 }
129 }
130
131
132
133
134
135 static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> {
136 private Reader reader = null;
137
138 Entry currentEntry = new Entry();
139 private long startTime;
140 private long endTime;
141
142 @Override
143 public void initialize(InputSplit split, TaskAttemptContext context)
144 throws IOException, InterruptedException {
145 WALSplit hsplit = (WALSplit)split;
146 Path logFile = new Path(hsplit.getLogFileName());
147 Configuration conf = context.getConfiguration();
148 LOG.info("Opening reader for "+split);
149 try {
150 this.reader = WALFactory.createReader(logFile.getFileSystem(conf), logFile, conf);
151 } catch (EOFException x) {
152 LOG.info("Ignoring corrupted WAL file: " + logFile
153 + " (This is normal when a RegionServer crashed.)");
154 this.reader = null;
155 }
156 this.startTime = hsplit.getStartTime();
157 this.endTime = hsplit.getEndTime();
158 }
159
160 @Override
161 public boolean nextKeyValue() throws IOException, InterruptedException {
162 if (reader == null) return false;
163
164 Entry temp;
165 long i = -1;
166 do {
167
168 try {
169 temp = reader.next(currentEntry);
170 i++;
171 } catch (EOFException x) {
172 LOG.info("Corrupted entry detected. Ignoring the rest of the file."
173 + " (This is normal when a RegionServer crashed.)");
174 return false;
175 }
176 }
177 while(temp != null && temp.getKey().getWriteTime() < startTime);
178
179 if (temp == null) {
180 if (i > 0) LOG.info("Skipped " + i + " entries.");
181 LOG.info("Reached end of file.");
182 return false;
183 } else if (i > 0) {
184 LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
185 }
186 boolean res = temp.getKey().getWriteTime() <= endTime;
187 if (!res) {
188 LOG.info("Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
189 }
190 return res;
191 }
192
193 @Override
194 public WALEdit getCurrentValue() throws IOException, InterruptedException {
195 return currentEntry.getEdit();
196 }
197
198 @Override
199 public float getProgress() throws IOException, InterruptedException {
200
201 return 0;
202 }
203
204 @Override
205 public void close() throws IOException {
206 LOG.info("Closing reader");
207 if (reader != null) this.reader.close();
208 }
209 }
210
211
212
213
214
215 static class WALKeyRecordReader extends WALRecordReader<WALKey> {
216 @Override
217 public WALKey getCurrentKey() throws IOException, InterruptedException {
218 return currentEntry.getKey();
219 }
220 }
221
222 @Override
223 public List<InputSplit> getSplits(JobContext context) throws IOException,
224 InterruptedException {
225 return getSplits(context, START_TIME_KEY, END_TIME_KEY);
226 }
227
228
229
230
231 List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey)
232 throws IOException, InterruptedException {
233 Configuration conf = context.getConfiguration();
234 Path inputDir = new Path(conf.get("mapreduce.input.fileinputformat.inputdir"));
235
236 long startTime = conf.getLong(startKey, Long.MIN_VALUE);
237 long endTime = conf.getLong(endKey, Long.MAX_VALUE);
238
239 FileSystem fs = inputDir.getFileSystem(conf);
240 List<FileStatus> files = getFiles(fs, inputDir, startTime, endTime);
241
242 List<InputSplit> splits = new ArrayList<InputSplit>(files.size());
243 for (FileStatus file : files) {
244 splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
245 }
246 return splits;
247 }
248
249 private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
250 throws IOException {
251 List<FileStatus> result = new ArrayList<FileStatus>();
252 LOG.debug("Scanning " + dir.toString() + " for WAL files");
253
254 FileStatus[] files = fs.listStatus(dir);
255 if (files == null) return Collections.emptyList();
256 for (FileStatus file : files) {
257 if (file.isDirectory()) {
258
259 result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
260 } else {
261 String name = file.getPath().toString();
262 int idx = name.lastIndexOf('.');
263 if (idx > 0) {
264 try {
265 long fileStartTime = Long.parseLong(name.substring(idx+1));
266 if (fileStartTime <= endTime) {
267 LOG.info("Found: " + name);
268 result.add(file);
269 }
270 } catch (NumberFormatException x) {
271 idx = 0;
272 }
273 }
274 if (idx == 0) {
275 LOG.warn("File " + name + " does not appear to be an WAL file. Skipping...");
276 }
277 }
278 }
279 return result;
280 }
281
282 @Override
283 public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split,
284 TaskAttemptContext context) throws IOException, InterruptedException {
285 return new WALKeyRecordReader();
286 }
287 }