001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.EOFException; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.time.Instant; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.LocatedFileStatus; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.fs.RemoteIterator; 035import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 036import org.apache.hadoop.hbase.wal.WAL; 037import org.apache.hadoop.hbase.wal.WAL.Entry; 038import org.apache.hadoop.hbase.wal.WAL.Reader; 039import org.apache.hadoop.hbase.wal.WALEdit; 040import org.apache.hadoop.hbase.wal.WALKey; 041import org.apache.hadoop.io.Writable; 042import org.apache.hadoop.mapreduce.InputFormat; 043import org.apache.hadoop.mapreduce.InputSplit; 044import org.apache.hadoop.mapreduce.JobContext; 045import org.apache.hadoop.mapreduce.RecordReader; 046import org.apache.hadoop.mapreduce.TaskAttemptContext; 047import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 048import org.apache.hadoop.util.StringUtils; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files. 055 */ 056@InterfaceAudience.Public 057public class WALInputFormat extends InputFormat<WALKey, WALEdit> { 058 private static final Logger LOG = LoggerFactory.getLogger(WALInputFormat.class); 059 060 public static final String START_TIME_KEY = "wal.start.time"; 061 public static final String END_TIME_KEY = "wal.end.time"; 062 063 /** 064 * {@link InputSplit} for {@link WAL} files. Each split represent exactly one log file. 065 */ 066 static class WALSplit extends InputSplit implements Writable { 067 private String logFileName; 068 private long fileSize; 069 private long startTime; 070 private long endTime; 071 072 /** for serialization */ 073 public WALSplit() { 074 } 075 076 /** 077 * Represent an WALSplit, i.e. a single WAL file. Start- and EndTime are managed by the split, 078 * so that WAL files can be filtered before WALEdits are passed to the mapper(s). 079 */ 080 public WALSplit(String logFileName, long fileSize, long startTime, long endTime) { 081 this.logFileName = logFileName; 082 this.fileSize = fileSize; 083 this.startTime = startTime; 084 this.endTime = endTime; 085 } 086 087 @Override 088 public long getLength() throws IOException, InterruptedException { 089 return fileSize; 090 } 091 092 @Override 093 public String[] getLocations() throws IOException, InterruptedException { 094 // TODO: Find the data node with the most blocks for this WAL? 095 return new String[] {}; 096 } 097 098 public String getLogFileName() { 099 return logFileName; 100 } 101 102 public long getStartTime() { 103 return startTime; 104 } 105 106 public long getEndTime() { 107 return endTime; 108 } 109 110 @Override 111 public void readFields(DataInput in) throws IOException { 112 logFileName = in.readUTF(); 113 fileSize = in.readLong(); 114 startTime = in.readLong(); 115 endTime = in.readLong(); 116 } 117 118 @Override 119 public void write(DataOutput out) throws IOException { 120 out.writeUTF(logFileName); 121 out.writeLong(fileSize); 122 out.writeLong(startTime); 123 out.writeLong(endTime); 124 } 125 126 @Override 127 public String toString() { 128 return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize; 129 } 130 } 131 132 /** 133 * {@link RecordReader} for an {@link WAL} file. Implementation shared with deprecated 134 * HLogInputFormat. 135 */ 136 static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> { 137 private Reader reader = null; 138 // visible until we can remove the deprecated HLogInputFormat 139 Entry currentEntry = new Entry(); 140 private long startTime; 141 private long endTime; 142 private Configuration conf; 143 private Path logFile; 144 private long currentPos; 145 146 @Override 147 public void initialize(InputSplit split, TaskAttemptContext context) 148 throws IOException, InterruptedException { 149 WALSplit hsplit = (WALSplit) split; 150 logFile = new Path(hsplit.getLogFileName()); 151 conf = context.getConfiguration(); 152 LOG.info("Opening {} for {}", logFile, split); 153 openReader(logFile); 154 this.startTime = hsplit.getStartTime(); 155 this.endTime = hsplit.getEndTime(); 156 } 157 158 private void openReader(Path path) throws IOException { 159 closeReader(); 160 reader = AbstractFSWALProvider.openReader(path, conf); 161 seek(); 162 setCurrentPath(path); 163 } 164 165 private void setCurrentPath(Path path) { 166 this.logFile = path; 167 } 168 169 private void closeReader() throws IOException { 170 if (reader != null) { 171 reader.close(); 172 reader = null; 173 } 174 } 175 176 private void seek() throws IOException { 177 if (currentPos != 0) { 178 reader.seek(currentPos); 179 } 180 } 181 182 @Override 183 public boolean nextKeyValue() throws IOException, InterruptedException { 184 if (reader == null) { 185 return false; 186 } 187 this.currentPos = reader.getPosition(); 188 Entry temp; 189 long i = -1; 190 try { 191 do { 192 // skip older entries 193 try { 194 temp = reader.next(currentEntry); 195 i++; 196 } catch (EOFException x) { 197 LOG.warn("Corrupted entry detected. Ignoring the rest of the file." 198 + " (This is normal when a RegionServer crashed.)"); 199 return false; 200 } 201 } while (temp != null && temp.getKey().getWriteTime() < startTime); 202 203 if (temp == null) { 204 if (i > 0) { 205 LOG.info("Skipped " + i + " entries."); 206 } 207 LOG.info("Reached end of file."); 208 return false; 209 } else if (i > 0) { 210 LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + "."); 211 } 212 boolean res = temp.getKey().getWriteTime() <= endTime; 213 if (!res) { 214 LOG.info( 215 "Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file."); 216 } 217 return res; 218 } catch (IOException e) { 219 Path archivedLog = AbstractFSWALProvider.findArchivedLog(logFile, conf); 220 // archivedLog can be null if unable to locate in archiveDir. 221 if (archivedLog != null) { 222 openReader(archivedLog); 223 // Try call again in recursion 224 return nextKeyValue(); 225 } else { 226 throw e; 227 } 228 } 229 } 230 231 @Override 232 public WALEdit getCurrentValue() throws IOException, InterruptedException { 233 return currentEntry.getEdit(); 234 } 235 236 @Override 237 public float getProgress() throws IOException, InterruptedException { 238 // N/A depends on total number of entries, which is unknown 239 return 0; 240 } 241 242 @Override 243 public void close() throws IOException { 244 LOG.info("Closing reader"); 245 if (reader != null) { 246 this.reader.close(); 247 } 248 } 249 } 250 251 /** 252 * handler for non-deprecated WALKey version. fold into WALRecordReader once we no longer need to 253 * support HLogInputFormat. 254 */ 255 static class WALKeyRecordReader extends WALRecordReader<WALKey> { 256 @Override 257 public WALKey getCurrentKey() throws IOException, InterruptedException { 258 return currentEntry.getKey(); 259 } 260 } 261 262 @Override 263 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 264 return getSplits(context, START_TIME_KEY, END_TIME_KEY); 265 } 266 267 /** 268 * implementation shared with deprecated HLogInputFormat 269 */ 270 List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey) 271 throws IOException, InterruptedException { 272 Configuration conf = context.getConfiguration(); 273 boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, false); 274 Path[] inputPaths = getInputPaths(conf); 275 long startTime = conf.getLong(startKey, Long.MIN_VALUE); 276 long endTime = conf.getLong(endKey, Long.MAX_VALUE); 277 278 List<FileStatus> allFiles = new ArrayList<FileStatus>(); 279 for (Path inputPath : inputPaths) { 280 FileSystem fs = inputPath.getFileSystem(conf); 281 try { 282 List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime); 283 allFiles.addAll(files); 284 } catch (FileNotFoundException e) { 285 if (ignoreMissing) { 286 LOG.warn("File " + inputPath + " is missing. Skipping it."); 287 continue; 288 } 289 throw e; 290 } 291 } 292 List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size()); 293 for (FileStatus file : allFiles) { 294 splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime)); 295 } 296 return splits; 297 } 298 299 private Path[] getInputPaths(Configuration conf) { 300 String inpDirs = conf.get(FileInputFormat.INPUT_DIR); 301 return StringUtils 302 .stringToPath(inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ","))); 303 } 304 305 /** 306 * @param startTime If file looks like it has a timestamp in its name, we'll check if newer or 307 * equal to this value else we will filter out the file. If name does not seem to 308 * have a timestamp, we will just return it w/o filtering. 309 * @param endTime If file looks like it has a timestamp in its name, we'll check if older or 310 * equal to this value else we will filter out the file. If name does not seem to 311 * have a timestamp, we will just return it w/o filtering. 312 */ 313 private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime) 314 throws IOException { 315 List<FileStatus> result = new ArrayList<>(); 316 LOG.debug("Scanning " + dir.toString() + " for WAL files"); 317 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir); 318 if (!iter.hasNext()) { 319 return Collections.emptyList(); 320 } 321 while (iter.hasNext()) { 322 LocatedFileStatus file = iter.next(); 323 if (file.isDirectory()) { 324 // Recurse into sub directories 325 result.addAll(getFiles(fs, file.getPath(), startTime, endTime)); 326 } else { 327 addFile(result, file, startTime, endTime); 328 } 329 } 330 // TODO: These results should be sorted? Results could be content of recovered.edits directory 331 // -- null padded increasing numeric -- or a WAL file w/ timestamp suffix or timestamp and 332 // then meta suffix. See AbstractFSWALProvider#WALStartTimeComparator 333 return result; 334 } 335 336 static void addFile(List<FileStatus> result, LocatedFileStatus lfs, long startTime, 337 long endTime) { 338 long timestamp = WAL.getTimestamp(lfs.getPath().getName()); 339 if (timestamp > 0) { 340 // Looks like a valid timestamp. 341 if (timestamp <= endTime && timestamp >= startTime) { 342 LOG.info("Found {}", lfs.getPath()); 343 result.add(lfs); 344 } else { 345 LOG.info("Skipped {}, outside range [{}/{} - {}/{}]", lfs.getPath(), startTime, 346 Instant.ofEpochMilli(startTime), endTime, Instant.ofEpochMilli(endTime)); 347 } 348 } else { 349 // If no timestamp, add it regardless. 350 LOG.info("Found (no-timestamp!) {}", lfs); 351 result.add(lfs); 352 } 353 } 354 355 @Override 356 public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split, 357 TaskAttemptContext context) throws IOException, InterruptedException { 358 return new WALKeyRecordReader(); 359 } 360}