001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.EOFException; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.time.Instant; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.LocatedFileStatus; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.fs.RemoteIterator; 035import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 036import org.apache.hadoop.hbase.wal.WAL; 037import org.apache.hadoop.hbase.wal.WAL.Entry; 038import org.apache.hadoop.hbase.wal.WAL.Reader; 039import org.apache.hadoop.hbase.wal.WALEdit; 040import org.apache.hadoop.hbase.wal.WALKey; 041import org.apache.hadoop.io.Writable; 042import org.apache.hadoop.mapreduce.InputFormat; 043import org.apache.hadoop.mapreduce.InputSplit; 044import org.apache.hadoop.mapreduce.JobContext; 045import org.apache.hadoop.mapreduce.RecordReader; 046import org.apache.hadoop.mapreduce.TaskAttemptContext; 047import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 048import org.apache.hadoop.util.StringUtils; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files. 055 */ 056@InterfaceAudience.Public 057public class WALInputFormat extends InputFormat<WALKey, WALEdit> { 058 private static final Logger LOG = LoggerFactory.getLogger(WALInputFormat.class); 059 060 public static final String START_TIME_KEY = "wal.start.time"; 061 public static final String END_TIME_KEY = "wal.end.time"; 062 063 /** 064 * {@link InputSplit} for {@link WAL} files. Each split represent 065 * exactly one log file. 066 */ 067 static class WALSplit extends InputSplit implements Writable { 068 private String logFileName; 069 private long fileSize; 070 private long startTime; 071 private long endTime; 072 073 /** for serialization */ 074 public WALSplit() {} 075 076 /** 077 * Represent an WALSplit, i.e. a single WAL file. 078 * Start- and EndTime are managed by the split, so that WAL files can be 079 * filtered before WALEdits are passed to the mapper(s). 080 */ 081 public WALSplit(String logFileName, long fileSize, long startTime, long endTime) { 082 this.logFileName = logFileName; 083 this.fileSize = fileSize; 084 this.startTime = startTime; 085 this.endTime = endTime; 086 } 087 088 @Override 089 public long getLength() throws IOException, InterruptedException { 090 return fileSize; 091 } 092 093 @Override 094 public String[] getLocations() throws IOException, InterruptedException { 095 // TODO: Find the data node with the most blocks for this WAL? 096 return new String[] {}; 097 } 098 099 public String getLogFileName() { 100 return logFileName; 101 } 102 103 public long getStartTime() { 104 return startTime; 105 } 106 107 public long getEndTime() { 108 return endTime; 109 } 110 111 @Override 112 public void readFields(DataInput in) throws IOException { 113 logFileName = in.readUTF(); 114 fileSize = in.readLong(); 115 startTime = in.readLong(); 116 endTime = in.readLong(); 117 } 118 119 @Override 120 public void write(DataOutput out) throws IOException { 121 out.writeUTF(logFileName); 122 out.writeLong(fileSize); 123 out.writeLong(startTime); 124 out.writeLong(endTime); 125 } 126 127 @Override 128 public String toString() { 129 return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize; 130 } 131 } 132 133 /** 134 * {@link RecordReader} for an {@link WAL} file. 135 * Implementation shared with deprecated HLogInputFormat. 136 */ 137 static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> { 138 private Reader reader = null; 139 // visible until we can remove the deprecated HLogInputFormat 140 Entry currentEntry = new Entry(); 141 private long startTime; 142 private long endTime; 143 private Configuration conf; 144 private Path logFile; 145 private long currentPos; 146 147 @Override 148 public void initialize(InputSplit split, TaskAttemptContext context) 149 throws IOException, InterruptedException { 150 WALSplit hsplit = (WALSplit)split; 151 logFile = new Path(hsplit.getLogFileName()); 152 conf = context.getConfiguration(); 153 LOG.info("Opening {} for {}", logFile, split); 154 openReader(logFile); 155 this.startTime = hsplit.getStartTime(); 156 this.endTime = hsplit.getEndTime(); 157 } 158 159 private void openReader(Path path) throws IOException { 160 closeReader(); 161 reader = AbstractFSWALProvider.openReader(path, conf); 162 seek(); 163 setCurrentPath(path); 164 } 165 166 private void setCurrentPath(Path path) { 167 this.logFile = path; 168 } 169 170 private void closeReader() throws IOException { 171 if (reader != null) { 172 reader.close(); 173 reader = null; 174 } 175 } 176 177 private void seek() throws IOException { 178 if (currentPos != 0) { 179 reader.seek(currentPos); 180 } 181 } 182 183 @Override 184 public boolean nextKeyValue() throws IOException, InterruptedException { 185 if (reader == null) { 186 return false; 187 } 188 this.currentPos = reader.getPosition(); 189 Entry temp; 190 long i = -1; 191 try { 192 do { 193 // skip older entries 194 try { 195 temp = reader.next(currentEntry); 196 i++; 197 } catch (EOFException x) { 198 LOG.warn("Corrupted entry detected. Ignoring the rest of the file." 199 + " (This is normal when a RegionServer crashed.)"); 200 return false; 201 } 202 } while (temp != null && temp.getKey().getWriteTime() < startTime); 203 204 if (temp == null) { 205 if (i > 0) { 206 LOG.info("Skipped " + i + " entries."); 207 } 208 LOG.info("Reached end of file."); 209 return false; 210 } else if (i > 0) { 211 LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + "."); 212 } 213 boolean res = temp.getKey().getWriteTime() <= endTime; 214 if (!res) { 215 LOG.info("Reached ts: " + temp.getKey().getWriteTime() 216 + " ignoring the rest of the file."); 217 } 218 return res; 219 } catch (IOException e) { 220 Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf); 221 if (logFile != archivedLog) { 222 openReader(archivedLog); 223 // Try call again in recursion 224 return nextKeyValue(); 225 } else { 226 throw e; 227 } 228 } 229 } 230 231 @Override 232 public WALEdit getCurrentValue() throws IOException, InterruptedException { 233 return currentEntry.getEdit(); 234 } 235 236 @Override 237 public float getProgress() throws IOException, InterruptedException { 238 // N/A depends on total number of entries, which is unknown 239 return 0; 240 } 241 242 @Override 243 public void close() throws IOException { 244 LOG.info("Closing reader"); 245 if (reader != null) { 246 this.reader.close(); 247 } 248 } 249 } 250 251 /** 252 * handler for non-deprecated WALKey version. fold into WALRecordReader once we no longer 253 * need to support HLogInputFormat. 254 */ 255 static class WALKeyRecordReader extends WALRecordReader<WALKey> { 256 @Override 257 public WALKey getCurrentKey() throws IOException, InterruptedException { 258 return currentEntry.getKey(); 259 } 260 } 261 262 @Override 263 public List<InputSplit> getSplits(JobContext context) throws IOException, 264 InterruptedException { 265 return getSplits(context, START_TIME_KEY, END_TIME_KEY); 266 } 267 268 /** 269 * implementation shared with deprecated HLogInputFormat 270 */ 271 List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey) 272 throws IOException, InterruptedException { 273 Configuration conf = context.getConfiguration(); 274 boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, false); 275 Path[] inputPaths = getInputPaths(conf); 276 long startTime = conf.getLong(startKey, Long.MIN_VALUE); 277 long endTime = conf.getLong(endKey, Long.MAX_VALUE); 278 279 List<FileStatus> allFiles = new ArrayList<FileStatus>(); 280 for(Path inputPath: inputPaths){ 281 FileSystem fs = inputPath.getFileSystem(conf); 282 try { 283 List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime); 284 allFiles.addAll(files); 285 } catch (FileNotFoundException e) { 286 if (ignoreMissing) { 287 LOG.warn("File "+ inputPath +" is missing. Skipping it."); 288 continue; 289 } 290 throw e; 291 } 292 } 293 List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size()); 294 for (FileStatus file : allFiles) { 295 splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime)); 296 } 297 return splits; 298 } 299 300 private Path[] getInputPaths(Configuration conf) { 301 String inpDirs = conf.get(FileInputFormat.INPUT_DIR); 302 return StringUtils.stringToPath( 303 inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ","))); 304 } 305 306 /** 307 * @param startTime If file looks like it has a timestamp in its name, we'll check if newer 308 * or equal to this value else we will filter out the file. If name does not 309 * seem to have a timestamp, we will just return it w/o filtering. 310 * @param endTime If file looks like it has a timestamp in its name, we'll check if older or equal 311 * to this value else we will filter out the file. If name does not seem to 312 * have a timestamp, we will just return it w/o filtering. 313 */ 314 private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime) 315 throws IOException { 316 List<FileStatus> result = new ArrayList<>(); 317 LOG.debug("Scanning " + dir.toString() + " for WAL files"); 318 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir); 319 if (!iter.hasNext()) { 320 return Collections.emptyList(); 321 } 322 while (iter.hasNext()) { 323 LocatedFileStatus file = iter.next(); 324 if (file.isDirectory()) { 325 // Recurse into sub directories 326 result.addAll(getFiles(fs, file.getPath(), startTime, endTime)); 327 } else { 328 addFile(result, file, startTime, endTime); 329 } 330 } 331 // TODO: These results should be sorted? Results could be content of recovered.edits directory 332 // -- null padded increasing numeric -- or a WAL file w/ timestamp suffix or timestamp and 333 // then meta suffix. See AbstractFSWALProvider#WALStartTimeComparator 334 return result; 335 } 336 337 static void addFile(List<FileStatus> result, LocatedFileStatus lfs, long startTime, 338 long endTime) { 339 long timestamp = WAL.getTimestamp(lfs.getPath().getName()); 340 if (timestamp > 0) { 341 // Looks like a valid timestamp. 342 if (timestamp <= endTime && timestamp >= startTime) { 343 LOG.info("Found {}", lfs.getPath()); 344 result.add(lfs); 345 } else { 346 LOG.info("Skipped {}, outside range [{}/{} - {}/{}]", lfs.getPath(), 347 startTime, Instant.ofEpochMilli(startTime), endTime, Instant.ofEpochMilli(endTime)); 348 } 349 } else { 350 // If no timestamp, add it regardless. 351 LOG.info("Found (no-timestamp!) {}", lfs); 352 result.add(lfs); 353 } 354 } 355 356 @Override 357 public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split, 358 TaskAttemptContext context) throws IOException, InterruptedException { 359 return new WALKeyRecordReader(); 360 } 361}