001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.EOFException; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.time.Instant; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.LocatedFileStatus; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.fs.RemoteIterator; 035import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException; 036import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 037import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 038import org.apache.hadoop.hbase.wal.WAL; 039import org.apache.hadoop.hbase.wal.WAL.Entry; 040import org.apache.hadoop.hbase.wal.WALEdit; 041import org.apache.hadoop.hbase.wal.WALFactory; 042import org.apache.hadoop.hbase.wal.WALKey; 043import org.apache.hadoop.hbase.wal.WALStreamReader; 044import org.apache.hadoop.io.Writable; 045import org.apache.hadoop.mapreduce.InputFormat; 046import org.apache.hadoop.mapreduce.InputSplit; 047import org.apache.hadoop.mapreduce.JobContext; 048import org.apache.hadoop.mapreduce.RecordReader; 049import org.apache.hadoop.mapreduce.TaskAttemptContext; 050import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 051import org.apache.hadoop.mapreduce.security.TokenCache; 052import org.apache.hadoop.util.StringUtils; 053import org.apache.yetus.audience.InterfaceAudience; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057/** 058 * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files. 059 */ 060@InterfaceAudience.Public 061public class WALInputFormat extends InputFormat<WALKey, WALEdit> { 062 private static final Logger LOG = LoggerFactory.getLogger(WALInputFormat.class); 063 064 public static final String START_TIME_KEY = "wal.start.time"; 065 public static final String END_TIME_KEY = "wal.end.time"; 066 067 /** 068 * {@link InputSplit} for {@link WAL} files. Each split represent exactly one log file. 069 */ 070 static class WALSplit extends InputSplit implements Writable { 071 private String logFileName; 072 private long fileSize; 073 private long startTime; 074 private long endTime; 075 076 /** for serialization */ 077 public WALSplit() { 078 } 079 080 /** 081 * Represent an WALSplit, i.e. a single WAL file. Start- and EndTime are managed by the split, 082 * so that WAL files can be filtered before WALEdits are passed to the mapper(s). 083 */ 084 public WALSplit(String logFileName, long fileSize, long startTime, long endTime) { 085 this.logFileName = logFileName; 086 this.fileSize = fileSize; 087 this.startTime = startTime; 088 this.endTime = endTime; 089 } 090 091 @Override 092 public long getLength() throws IOException, InterruptedException { 093 return fileSize; 094 } 095 096 @Override 097 public String[] getLocations() throws IOException, InterruptedException { 098 // TODO: Find the data node with the most blocks for this WAL? 099 return new String[] {}; 100 } 101 102 public String getLogFileName() { 103 return logFileName; 104 } 105 106 public long getStartTime() { 107 return startTime; 108 } 109 110 public long getEndTime() { 111 return endTime; 112 } 113 114 @Override 115 public void readFields(DataInput in) throws IOException { 116 logFileName = in.readUTF(); 117 fileSize = in.readLong(); 118 startTime = in.readLong(); 119 endTime = in.readLong(); 120 } 121 122 @Override 123 public void write(DataOutput out) throws IOException { 124 out.writeUTF(logFileName); 125 out.writeLong(fileSize); 126 out.writeLong(startTime); 127 out.writeLong(endTime); 128 } 129 130 @Override 131 public String toString() { 132 return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize; 133 } 134 } 135 136 /** 137 * {@link RecordReader} for an {@link WAL} file. Implementation shared with deprecated 138 * HLogInputFormat. 139 */ 140 static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> { 141 private WALStreamReader reader = null; 142 // visible until we can remove the deprecated HLogInputFormat 143 Entry currentEntry = new Entry(); 144 private long startTime; 145 private long endTime; 146 private Configuration conf; 147 private Path logFile; 148 private long currentPos; 149 150 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION", 151 justification = "HDFS-4380") 152 private WALStreamReader openReader(Path path, long startPosition) throws IOException { 153 long retryInterval = 2000; // 2 sec 154 int maxAttempts = 30; 155 int attempt = 0; 156 Exception ee = null; 157 WALStreamReader reader = null; 158 while (reader == null && attempt++ < maxAttempts) { 159 try { 160 // Detect if this is a new file, if so get a new reader else 161 // reset the current reader so that we see the new data 162 reader = 163 WALFactory.createStreamReader(path.getFileSystem(conf), path, conf, startPosition); 164 return reader; 165 } catch (WALHeaderEOFException wheofe) { 166 // We hit EOF while reading the WAL header. A file that ever had an entry synced to it 167 // necessarily has a complete, readable header (a sync flushes the header too), so a 168 // header EOF means the file holds nothing recoverable right now. For a file that is not 169 // being actively written (a closed/archived WAL, or one left empty by a crashed 170 // RegionServer) the header never appears, so retrying only delays an inevitable skip. 171 // The one case a retry could help is a WAL still being written by the legacy 172 // (non-async) writer that has not yet flushed its header; but we skip that too. 173 LOG.warn("Got WALHeaderEOFException opening reader for {}, skipping empty WAL file.", 174 path, wheofe); 175 return null; 176 } catch (LeaseNotRecoveredException lnre) { 177 // HBASE-15019 the WAL was not closed due to some hiccup. 178 LOG.warn("Try to recover the WAL lease " + path, lnre); 179 AbstractFSWALProvider.recoverLease(conf, path); 180 reader = null; 181 ee = lnre; 182 } catch (NullPointerException npe) { 183 // Workaround for race condition in HDFS-4380 184 // which throws a NPE if we open a file before any data node has the most recent block 185 // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. 186 LOG.warn("Got NPE opening reader, will retry."); 187 reader = null; 188 ee = npe; 189 } 190 if (reader == null) { 191 // sleep before next attempt 192 try { 193 Thread.sleep(retryInterval); 194 } catch (InterruptedException e) { 195 Thread.currentThread().interrupt(); 196 } 197 } 198 } 199 throw new IOException("Could not open reader", ee); 200 } 201 202 @Override 203 public void initialize(InputSplit split, TaskAttemptContext context) 204 throws IOException, InterruptedException { 205 WALSplit hsplit = (WALSplit) split; 206 logFile = new Path(hsplit.getLogFileName()); 207 conf = context.getConfiguration(); 208 LOG.info("Opening {} for {}", logFile, split); 209 openReader(logFile); 210 this.startTime = hsplit.getStartTime(); 211 this.endTime = hsplit.getEndTime(); 212 } 213 214 private void openReader(Path path) throws IOException { 215 closeReader(); 216 reader = openReader(path, currentPos > 0 ? currentPos : -1); 217 setCurrentPath(path); 218 } 219 220 private void setCurrentPath(Path path) { 221 this.logFile = path; 222 } 223 224 private void closeReader() throws IOException { 225 if (reader != null) { 226 reader.close(); 227 reader = null; 228 } 229 } 230 231 @Override 232 public boolean nextKeyValue() throws IOException, InterruptedException { 233 if (reader == null) { 234 return false; 235 } 236 this.currentPos = reader.getPosition(); 237 Entry temp; 238 long i = -1; 239 try { 240 do { 241 // skip older entries 242 try { 243 temp = reader.next(currentEntry); 244 i++; 245 } catch (EOFException x) { 246 LOG.warn("Corrupted entry detected. Ignoring the rest of the file." 247 + " (This is normal when a RegionServer crashed.)"); 248 return false; 249 } 250 } while (temp != null && temp.getKey().getWriteTime() < startTime); 251 252 if (temp == null) { 253 if (i > 0) { 254 LOG.info("Skipped " + i + " entries."); 255 } 256 LOG.info("Reached end of file."); 257 return false; 258 } else if (i > 0) { 259 LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + "."); 260 } 261 boolean res = temp.getKey().getWriteTime() <= endTime; 262 if (!res) { 263 LOG.info( 264 "Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file."); 265 } 266 return res; 267 } catch (IOException e) { 268 Path archivedLog = AbstractFSWALProvider.findArchivedLog(logFile, conf); 269 // archivedLog can be null if unable to locate in archiveDir. 270 if (archivedLog != null) { 271 openReader(archivedLog); 272 // Try call again in recursion 273 return nextKeyValue(); 274 } else { 275 throw e; 276 } 277 } 278 } 279 280 @Override 281 public WALEdit getCurrentValue() throws IOException, InterruptedException { 282 return currentEntry.getEdit(); 283 } 284 285 @Override 286 public float getProgress() throws IOException, InterruptedException { 287 // N/A depends on total number of entries, which is unknown 288 return 0; 289 } 290 291 @Override 292 public void close() throws IOException { 293 LOG.info("Closing reader"); 294 if (reader != null) { 295 this.reader.close(); 296 } 297 } 298 } 299 300 /** 301 * handler for non-deprecated WALKey version. fold into WALRecordReader once we no longer need to 302 * support HLogInputFormat. 303 */ 304 static class WALKeyRecordReader extends WALRecordReader<WALKey> { 305 @Override 306 public WALKey getCurrentKey() throws IOException, InterruptedException { 307 return currentEntry.getKey(); 308 } 309 } 310 311 @Override 312 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 313 return getSplits(context, START_TIME_KEY, END_TIME_KEY); 314 } 315 316 /** 317 * implementation shared with deprecated HLogInputFormat 318 */ 319 List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey) 320 throws IOException, InterruptedException { 321 Configuration conf = context.getConfiguration(); 322 boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, false); 323 Path[] inputPaths = getInputPaths(conf); 324 // get delegation token for the filesystem 325 TokenCache.obtainTokensForNamenodes(context.getCredentials(), inputPaths, conf); 326 long startTime = conf.getLong(startKey, Long.MIN_VALUE); 327 long endTime = conf.getLong(endKey, Long.MAX_VALUE); 328 329 List<FileStatus> allFiles = new ArrayList<FileStatus>(); 330 for (Path inputPath : inputPaths) { 331 FileSystem fs = inputPath.getFileSystem(conf); 332 try { 333 List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime, conf); 334 allFiles.addAll(files); 335 } catch (FileNotFoundException e) { 336 if (ignoreMissing) { 337 LOG.warn("File " + inputPath + " is missing. Skipping it."); 338 continue; 339 } 340 throw e; 341 } 342 } 343 List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size()); 344 for (FileStatus file : allFiles) { 345 splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime)); 346 } 347 return splits; 348 } 349 350 private Path[] getInputPaths(Configuration conf) { 351 String inpDirs = conf.get(FileInputFormat.INPUT_DIR); 352 return StringUtils 353 .stringToPath(inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ","))); 354 } 355 356 /** 357 * @param startTime If file looks like it has a timestamp in its name, we'll check if newer or 358 * equal to this value else we will filter out the file. If name does not seem to 359 * have a timestamp, we will just return it w/o filtering. 360 * @param endTime If file looks like it has a timestamp in its name, we'll check if older or 361 * equal to this value else we will filter out the file. If name does not seem to 362 * have a timestamp, we will just return it w/o filtering. 363 */ 364 private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime, 365 Configuration conf) throws IOException { 366 List<FileStatus> result = new ArrayList<>(); 367 LOG.debug("Scanning " + dir.toString() + " for WAL files"); 368 RemoteIterator<LocatedFileStatus> iter = listLocatedFileStatus(fs, dir, conf); 369 if (!iter.hasNext()) { 370 return Collections.emptyList(); 371 } 372 while (iter.hasNext()) { 373 LocatedFileStatus file = iter.next(); 374 if (file.isDirectory()) { 375 // Recurse into sub directories 376 result.addAll(getFiles(fs, file.getPath(), startTime, endTime, conf)); 377 } else { 378 addFile(result, file, startTime, endTime); 379 } 380 } 381 // TODO: These results should be sorted? Results could be content of recovered.edits directory 382 // -- null padded increasing numeric -- or a WAL file w/ timestamp suffix or timestamp and 383 // then meta suffix. See AbstractFSWALProvider#WALStartTimeComparator 384 return result; 385 } 386 387 static void addFile(List<FileStatus> result, LocatedFileStatus lfs, long startTime, 388 long endTime) { 389 long timestamp = AbstractFSWALProvider.getTimestamp(lfs.getPath().getName()); 390 if (timestamp > 0) { 391 // Looks like a valid timestamp. 392 if (timestamp <= endTime && timestamp >= startTime) { 393 LOG.info("Found {}", lfs.getPath()); 394 result.add(lfs); 395 } else { 396 LOG.info("Skipped {}, outside range [{}/{} - {}/{}]", lfs.getPath(), startTime, 397 Instant.ofEpochMilli(startTime), endTime, Instant.ofEpochMilli(endTime)); 398 } 399 } else { 400 // If no timestamp, add it regardless. 401 LOG.info("Found (no-timestamp!) {}", lfs); 402 result.add(lfs); 403 } 404 } 405 406 @Override 407 public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split, 408 TaskAttemptContext context) throws IOException, InterruptedException { 409 return new WALKeyRecordReader(); 410 } 411 412 /** 413 * Attempts to return the {@link LocatedFileStatus} for the given directory. If the directory does 414 * not exist, it will check if the directory is an archived log file and try to find it 415 */ 416 private static RemoteIterator<LocatedFileStatus> listLocatedFileStatus(FileSystem fs, Path dir, 417 Configuration conf) throws IOException { 418 try { 419 return fs.listLocatedStatus(dir); 420 } catch (FileNotFoundException e) { 421 if (AbstractFSWALProvider.isArchivedLogFile(dir)) { 422 throw e; 423 } 424 425 LOG.warn("Log file {} not found, trying to find it in archive directory.", dir); 426 Path archiveFile = AbstractFSWALProvider.findArchivedLog(dir, conf); 427 if (archiveFile == null) { 428 LOG.error("Did not find archive file for {}", dir); 429 throw e; 430 } 431 432 return fs.listLocatedStatus(archiveFile); 433 } 434 } 435}