001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.EOFException; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.LocatedFileStatus; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.fs.RemoteIterator; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038import org.apache.hadoop.hbase.wal.WALEdit; 039import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 040import org.apache.hadoop.hbase.wal.WAL; 041import org.apache.hadoop.hbase.wal.WAL.Entry; 042import org.apache.hadoop.hbase.wal.WAL.Reader; 043import org.apache.hadoop.hbase.wal.WALKey; 044import org.apache.hadoop.io.Writable; 045import org.apache.hadoop.mapreduce.InputFormat; 046import org.apache.hadoop.mapreduce.InputSplit; 047import org.apache.hadoop.mapreduce.JobContext; 048import org.apache.hadoop.mapreduce.RecordReader; 049import org.apache.hadoop.mapreduce.TaskAttemptContext; 050import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 051import org.apache.hadoop.util.StringUtils; 052 053/** 054 * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files. 055 */ 056@InterfaceAudience.Public 057public class WALInputFormat extends InputFormat<WALKey, WALEdit> { 058 private static final Logger LOG = LoggerFactory.getLogger(WALInputFormat.class); 059 060 public static final String START_TIME_KEY = "wal.start.time"; 061 public static final String END_TIME_KEY = "wal.end.time"; 062 063 /** 064 * {@link InputSplit} for {@link WAL} files. Each split represent 065 * exactly one log file. 066 */ 067 static class WALSplit extends InputSplit implements Writable { 068 private String logFileName; 069 private long fileSize; 070 private long startTime; 071 private long endTime; 072 073 /** for serialization */ 074 public WALSplit() {} 075 076 /** 077 * Represent an WALSplit, i.e. a single WAL file. 078 * Start- and EndTime are managed by the split, so that WAL files can be 079 * filtered before WALEdits are passed to the mapper(s). 080 * @param logFileName 081 * @param fileSize 082 * @param startTime 083 * @param endTime 084 */ 085 public WALSplit(String logFileName, long fileSize, long startTime, long endTime) { 086 this.logFileName = logFileName; 087 this.fileSize = fileSize; 088 this.startTime = startTime; 089 this.endTime = endTime; 090 } 091 092 @Override 093 public long getLength() throws IOException, InterruptedException { 094 return fileSize; 095 } 096 097 @Override 098 public String[] getLocations() throws IOException, InterruptedException { 099 // TODO: Find the data node with the most blocks for this WAL? 100 return new String[] {}; 101 } 102 103 public String getLogFileName() { 104 return logFileName; 105 } 106 107 public long getStartTime() { 108 return startTime; 109 } 110 111 public long getEndTime() { 112 return endTime; 113 } 114 115 @Override 116 public void readFields(DataInput in) throws IOException { 117 logFileName = in.readUTF(); 118 fileSize = in.readLong(); 119 startTime = in.readLong(); 120 endTime = in.readLong(); 121 } 122 123 @Override 124 public void write(DataOutput out) throws IOException { 125 out.writeUTF(logFileName); 126 out.writeLong(fileSize); 127 out.writeLong(startTime); 128 out.writeLong(endTime); 129 } 130 131 @Override 132 public String toString() { 133 return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize; 134 } 135 } 136 137 /** 138 * {@link RecordReader} for an {@link WAL} file. 139 * Implementation shared with deprecated HLogInputFormat. 140 */ 141 static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> { 142 private Reader reader = null; 143 // visible until we can remove the deprecated HLogInputFormat 144 Entry currentEntry = new Entry(); 145 private long startTime; 146 private long endTime; 147 private Configuration conf; 148 private Path logFile; 149 private long currentPos; 150 151 @Override 152 public void initialize(InputSplit split, TaskAttemptContext context) 153 throws IOException, InterruptedException { 154 WALSplit hsplit = (WALSplit)split; 155 logFile = new Path(hsplit.getLogFileName()); 156 conf = context.getConfiguration(); 157 LOG.info("Opening reader for "+split); 158 openReader(logFile); 159 this.startTime = hsplit.getStartTime(); 160 this.endTime = hsplit.getEndTime(); 161 } 162 163 private void openReader(Path path) throws IOException 164 { 165 closeReader(); 166 reader = AbstractFSWALProvider.openReader(path, conf); 167 seek(); 168 setCurrentPath(path); 169 } 170 171 private void setCurrentPath(Path path) { 172 this.logFile = path; 173 } 174 175 private void closeReader() throws IOException { 176 if (reader != null) { 177 reader.close(); 178 reader = null; 179 } 180 } 181 182 private void seek() throws IOException { 183 if (currentPos != 0) { 184 reader.seek(currentPos); 185 } 186 } 187 188 @Override 189 public boolean nextKeyValue() throws IOException, InterruptedException { 190 if (reader == null) return false; 191 this.currentPos = reader.getPosition(); 192 Entry temp; 193 long i = -1; 194 try { 195 do { 196 // skip older entries 197 try { 198 temp = reader.next(currentEntry); 199 i++; 200 } catch (EOFException x) { 201 LOG.warn("Corrupted entry detected. Ignoring the rest of the file." 202 + " (This is normal when a RegionServer crashed.)"); 203 return false; 204 } 205 } while (temp != null && temp.getKey().getWriteTime() < startTime); 206 207 if (temp == null) { 208 if (i > 0) LOG.info("Skipped " + i + " entries."); 209 LOG.info("Reached end of file."); 210 return false; 211 } else if (i > 0) { 212 LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + "."); 213 } 214 boolean res = temp.getKey().getWriteTime() <= endTime; 215 if (!res) { 216 LOG.info("Reached ts: " + temp.getKey().getWriteTime() 217 + " ignoring the rest of the file."); 218 } 219 return res; 220 } catch (IOException e) { 221 Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf); 222 if (logFile != archivedLog) { 223 openReader(archivedLog); 224 // Try call again in recursion 225 return nextKeyValue(); 226 } else { 227 throw e; 228 } 229 } 230 } 231 232 @Override 233 public WALEdit getCurrentValue() throws IOException, InterruptedException { 234 return currentEntry.getEdit(); 235 } 236 237 @Override 238 public float getProgress() throws IOException, InterruptedException { 239 // N/A depends on total number of entries, which is unknown 240 return 0; 241 } 242 243 @Override 244 public void close() throws IOException { 245 LOG.info("Closing reader"); 246 if (reader != null) this.reader.close(); 247 } 248 } 249 250 /** 251 * handler for non-deprecated WALKey version. fold into WALRecordReader once we no longer 252 * need to support HLogInputFormat. 253 */ 254 static class WALKeyRecordReader extends WALRecordReader<WALKey> { 255 @Override 256 public WALKey getCurrentKey() throws IOException, InterruptedException { 257 return currentEntry.getKey(); 258 } 259 } 260 261 @Override 262 public List<InputSplit> getSplits(JobContext context) throws IOException, 263 InterruptedException { 264 return getSplits(context, START_TIME_KEY, END_TIME_KEY); 265 } 266 267 /** 268 * implementation shared with deprecated HLogInputFormat 269 */ 270 List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey) 271 throws IOException, InterruptedException { 272 Configuration conf = context.getConfiguration(); 273 boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, false); 274 Path[] inputPaths = getInputPaths(conf); 275 long startTime = conf.getLong(startKey, Long.MIN_VALUE); 276 long endTime = conf.getLong(endKey, Long.MAX_VALUE); 277 278 List<FileStatus> allFiles = new ArrayList<FileStatus>(); 279 for(Path inputPath: inputPaths){ 280 FileSystem fs = inputPath.getFileSystem(conf); 281 try { 282 List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime); 283 allFiles.addAll(files); 284 } catch (FileNotFoundException e) { 285 if (ignoreMissing) { 286 LOG.warn("File "+ inputPath +" is missing. Skipping it."); 287 continue; 288 } 289 throw e; 290 } 291 } 292 List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size()); 293 for (FileStatus file : allFiles) { 294 splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime)); 295 } 296 return splits; 297 } 298 299 private Path[] getInputPaths(Configuration conf) { 300 String inpDirs = conf.get(FileInputFormat.INPUT_DIR); 301 return StringUtils.stringToPath( 302 inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ","))); 303 } 304 305 private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime) 306 throws IOException { 307 List<FileStatus> result = new ArrayList<>(); 308 LOG.debug("Scanning " + dir.toString() + " for WAL files"); 309 310 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir); 311 if (!iter.hasNext()) return Collections.emptyList(); 312 while (iter.hasNext()) { 313 LocatedFileStatus file = iter.next(); 314 if (file.isDirectory()) { 315 // recurse into sub directories 316 result.addAll(getFiles(fs, file.getPath(), startTime, endTime)); 317 } else { 318 String name = file.getPath().toString(); 319 int idx = name.lastIndexOf('.'); 320 if (idx > 0) { 321 try { 322 long fileStartTime = Long.parseLong(name.substring(idx+1)); 323 if (fileStartTime <= endTime) { 324 LOG.info("Found: " + file); 325 result.add(file); 326 } 327 } catch (NumberFormatException x) { 328 idx = 0; 329 } 330 } 331 if (idx == 0) { 332 LOG.warn("File " + name + " does not appear to be an WAL file. Skipping..."); 333 } 334 } 335 } 336 return result; 337 } 338 339 @Override 340 public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split, 341 TaskAttemptContext context) throws IOException, InterruptedException { 342 return new WALKeyRecordReader(); 343 } 344}