001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.Closeable; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.util.OptionalLong; 025import java.util.concurrent.PriorityBlockingQueue; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileStatus; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.ServerName; 032import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; 033import org.apache.hadoop.hbase.util.CancelableProgressable; 034import org.apache.hadoop.hbase.util.CommonFSUtils; 035import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 036import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 037import org.apache.hadoop.hbase.wal.WAL.Entry; 038import org.apache.hadoop.hbase.wal.WAL.Reader; 039import org.apache.hadoop.hbase.wal.WALFactory; 040import org.apache.hadoop.ipc.RemoteException; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.yetus.audience.InterfaceStability; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually 048 * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it 049 * dequeues it and starts reading from the next. 050 */ 051@InterfaceAudience.Private 052@InterfaceStability.Evolving 053class WALEntryStream implements Closeable { 054 private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class); 055 056 private Reader reader; 057 private Path currentPath; 058 // cache of next entry for hasNext() 059 private Entry currentEntry; 060 // position for the current entry. As now we support peek, which means that the upper layer may 061 // choose to return before reading the current entry, so it is not safe to return the value below 062 // in getPosition. 063 private long currentPositionOfEntry = 0; 064 // position after reading current entry 065 private long currentPositionOfReader = 0; 066 private final PriorityBlockingQueue<Path> logQueue; 067 private final FileSystem fs; 068 private final Configuration conf; 069 private final WALFileLengthProvider walFileLengthProvider; 070 // which region server the WALs belong to 071 private final ServerName serverName; 072 private final MetricsSource metrics; 073 074 /** 075 * Create an entry stream over the given queue at the given start position 076 * @param logQueue the queue of WAL paths 077 * @param conf the {@link Configuration} to use to create {@link Reader} for this stream 078 * @param startPosition the position in the first WAL to start reading at 079 * @param walFileLengthProvider provides the length of the WAL file 080 * @param serverName the server name which all WALs belong to 081 * @param metrics the replication metrics 082 * @throws IOException 083 */ 084 public WALEntryStream(PriorityBlockingQueue<Path> logQueue, Configuration conf, 085 long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, 086 MetricsSource metrics) throws IOException { 087 this.logQueue = logQueue; 088 this.fs = CommonFSUtils.getWALFileSystem(conf); 089 this.conf = conf; 090 this.currentPositionOfEntry = startPosition; 091 this.walFileLengthProvider = walFileLengthProvider; 092 this.serverName = serverName; 093 this.metrics = metrics; 094 } 095 096 /** 097 * @return true if there is another WAL {@link Entry} 098 */ 099 public boolean hasNext() throws IOException { 100 if (currentEntry == null) { 101 tryAdvanceEntry(); 102 } 103 return currentEntry != null; 104 } 105 106 /** 107 * Returns the next WAL entry in this stream but does not advance. 108 */ 109 public Entry peek() throws IOException { 110 return hasNext() ? currentEntry: null; 111 } 112 113 /** 114 * Returns the next WAL entry in this stream and advance the stream. 115 */ 116 public Entry next() throws IOException { 117 Entry save = peek(); 118 currentPositionOfEntry = currentPositionOfReader; 119 currentEntry = null; 120 return save; 121 } 122 123 /** 124 * {@inheritDoc} 125 */ 126 @Override 127 public void close() throws IOException { 128 closeReader(); 129 } 130 131 /** 132 * @return the position of the last Entry returned by next() 133 */ 134 public long getPosition() { 135 return currentPositionOfEntry; 136 } 137 138 /** 139 * @return the {@link Path} of the current WAL 140 */ 141 public Path getCurrentPath() { 142 return currentPath; 143 } 144 145 private String getCurrentPathStat() { 146 StringBuilder sb = new StringBuilder(); 147 if (currentPath != null) { 148 sb.append("currently replicating from: ").append(currentPath).append(" at position: ") 149 .append(currentPositionOfEntry).append("\n"); 150 } else { 151 sb.append("no replication ongoing, waiting for new log"); 152 } 153 return sb.toString(); 154 } 155 156 /** 157 * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned 158 * false) 159 */ 160 public void reset() throws IOException { 161 if (reader != null && currentPath != null) { 162 resetReader(); 163 } 164 } 165 166 private void setPosition(long position) { 167 currentPositionOfEntry = position; 168 } 169 170 private void setCurrentPath(Path path) { 171 this.currentPath = path; 172 } 173 174 private void tryAdvanceEntry() throws IOException { 175 if (checkReader()) { 176 boolean beingWritten = readNextEntryAndRecordReaderPosition(); 177 LOG.trace("reading wal file {}. Current open for write: {}", this.currentPath, beingWritten); 178 if (currentEntry == null && !beingWritten) { 179 // no more entries in this log file, and the file is already closed, i.e, rolled 180 // Before dequeueing, we should always get one more attempt at reading. 181 // This is in case more entries came in after we opened the reader, and the log is rolled 182 // while we were reading. See HBASE-6758 183 resetReader(); 184 readNextEntryAndRecordReaderPosition(); 185 if (currentEntry == null) { 186 if (checkAllBytesParsed()) { // now we're certain we're done with this log file 187 dequeueCurrentLog(); 188 if (openNextLog()) { 189 readNextEntryAndRecordReaderPosition(); 190 } 191 } 192 } 193 } 194 // if currentEntry != null then just return 195 // if currentEntry == null but the file is still being written, then we should not switch to 196 // the next log either, just return here and try next time to see if there are more entries in 197 // the current file 198 } 199 // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue) 200 } 201 202 // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file 203 private boolean checkAllBytesParsed() throws IOException { 204 // -1 means the wal wasn't closed cleanly. 205 final long trailerSize = currentTrailerSize(); 206 FileStatus stat = null; 207 try { 208 stat = fs.getFileStatus(this.currentPath); 209 } catch (IOException exception) { 210 LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", 211 currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat()); 212 metrics.incrUnknownFileLengthForClosedWAL(); 213 } 214 // Here we use currentPositionOfReader instead of currentPositionOfEntry. 215 // We only call this method when currentEntry is null so usually they are the same, but there 216 // are two exceptions. One is we have nothing in the file but only a header, in this way 217 // the currentPositionOfEntry will always be 0 since we have no change to update it. The other 218 // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the 219 // last valid entry, and the currentPositionOfReader will usually point to the end of the file. 220 if (stat != null) { 221 if (trailerSize < 0) { 222 if (currentPositionOfReader < stat.getLen()) { 223 final long skippedBytes = stat.getLen() - currentPositionOfReader; 224 LOG.debug( 225 "Reached the end of WAL file '{}'. It was not closed cleanly," + 226 " so we did not parse {} bytes of data. This is normally ok.", 227 currentPath, skippedBytes); 228 metrics.incrUncleanlyClosedWALs(); 229 metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); 230 } 231 } else if (currentPositionOfReader + trailerSize < stat.getLen()) { 232 LOG.warn( 233 "Processing end of WAL file '{}'. At position {}, which is too far away from" + 234 " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", 235 currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat()); 236 setPosition(0); 237 resetReader(); 238 metrics.incrRestartedWALReading(); 239 metrics.incrRepeatedFileBytes(currentPositionOfReader); 240 return false; 241 } 242 } 243 if (LOG.isTraceEnabled()) { 244 LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " + 245 (stat == null ? "N/A" : stat.getLen())); 246 } 247 metrics.incrCompletedWAL(); 248 return true; 249 } 250 251 private void dequeueCurrentLog() throws IOException { 252 LOG.debug("Reached the end of log {}", currentPath); 253 closeReader(); 254 logQueue.remove(); 255 setPosition(0); 256 metrics.decrSizeOfLogQueue(); 257 } 258 259 /** 260 * Returns whether the file is opened for writing. 261 */ 262 private boolean readNextEntryAndRecordReaderPosition() throws IOException { 263 Entry readEntry = reader.next(); 264 long readerPos = reader.getPosition(); 265 OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); 266 if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { 267 // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted 268 // data, so we need to make sure that we do not read beyond the committed file length. 269 if (LOG.isDebugEnabled()) { 270 LOG.debug("The provider tells us the valid length for " + currentPath + " is " + 271 fileLength.getAsLong() + ", but we have advanced to " + readerPos); 272 } 273 resetReader(); 274 return true; 275 } 276 if (readEntry != null) { 277 LOG.trace("reading entry: {} ", readEntry); 278 metrics.incrLogEditsRead(); 279 metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); 280 } 281 currentEntry = readEntry; // could be null 282 this.currentPositionOfReader = readerPos; 283 return fileLength.isPresent(); 284 } 285 286 private void closeReader() throws IOException { 287 if (reader != null) { 288 reader.close(); 289 reader = null; 290 } 291 } 292 293 // if we don't have a reader, open a reader on the next log 294 private boolean checkReader() throws IOException { 295 if (reader == null) { 296 return openNextLog(); 297 } 298 return true; 299 } 300 301 // open a reader on the next log in queue 302 private boolean openNextLog() throws IOException { 303 Path nextPath = logQueue.peek(); 304 if (nextPath != null) { 305 openReader(nextPath); 306 if (reader != null) { 307 return true; 308 } 309 } else { 310 // no more files in queue, this could only happen for recovered queue. 311 setCurrentPath(null); 312 } 313 return false; 314 } 315 316 private Path getArchivedLog(Path path) throws IOException { 317 Path walRootDir = CommonFSUtils.getWALRootDir(conf); 318 319 // Try found the log in old dir 320 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 321 Path archivedLogLocation = new Path(oldLogDir, path.getName()); 322 if (fs.exists(archivedLogLocation)) { 323 LOG.info("Log " + path + " was moved to " + archivedLogLocation); 324 return archivedLogLocation; 325 } 326 327 // Try found the log in the seperate old log dir 328 oldLogDir = 329 new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) 330 .append(Path.SEPARATOR).append(serverName.getServerName()).toString()); 331 archivedLogLocation = new Path(oldLogDir, path.getName()); 332 if (fs.exists(archivedLogLocation)) { 333 LOG.info("Log " + path + " was moved to " + archivedLogLocation); 334 return archivedLogLocation; 335 } 336 337 LOG.error("Couldn't locate log: " + path); 338 return path; 339 } 340 341 private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException { 342 // If the log was archived, continue reading from there 343 Path archivedLog = getArchivedLog(path); 344 if (!path.equals(archivedLog)) { 345 openReader(archivedLog); 346 } else { 347 throw fnfe; 348 } 349 } 350 351 private void openReader(Path path) throws IOException { 352 try { 353 // Detect if this is a new file, if so get a new reader else 354 // reset the current reader so that we see the new data 355 if (reader == null || !getCurrentPath().equals(path)) { 356 closeReader(); 357 reader = WALFactory.createReader(fs, path, conf); 358 seek(); 359 setCurrentPath(path); 360 } else { 361 resetReader(); 362 } 363 } catch (FileNotFoundException fnfe) { 364 handleFileNotFound(path, fnfe); 365 } catch (RemoteException re) { 366 IOException ioe = re.unwrapRemoteException(FileNotFoundException.class); 367 if (!(ioe instanceof FileNotFoundException)) throw ioe; 368 handleFileNotFound(path, (FileNotFoundException)ioe); 369 } catch (LeaseNotRecoveredException lnre) { 370 // HBASE-15019 the WAL was not closed due to some hiccup. 371 LOG.warn("Try to recover the WAL lease " + currentPath, lnre); 372 recoverLease(conf, currentPath); 373 reader = null; 374 } catch (NullPointerException npe) { 375 // Workaround for race condition in HDFS-4380 376 // which throws a NPE if we open a file before any data node has the most recent block 377 // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. 378 LOG.warn("Got NPE opening reader, will retry."); 379 reader = null; 380 } 381 } 382 383 // For HBASE-15019 384 private void recoverLease(final Configuration conf, final Path path) { 385 try { 386 final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf); 387 RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { 388 @Override 389 public boolean progress() { 390 LOG.debug("recover WAL lease: " + path); 391 return true; 392 } 393 }); 394 } catch (IOException e) { 395 LOG.warn("unable to recover lease for WAL: " + path, e); 396 } 397 } 398 399 private void resetReader() throws IOException { 400 try { 401 currentEntry = null; 402 reader.reset(); 403 seek(); 404 } catch (FileNotFoundException fnfe) { 405 // If the log was archived, continue reading from there 406 Path archivedLog = getArchivedLog(currentPath); 407 if (!currentPath.equals(archivedLog)) { 408 openReader(archivedLog); 409 } else { 410 throw fnfe; 411 } 412 } catch (NullPointerException npe) { 413 throw new IOException("NPE resetting reader, likely HDFS-4380", npe); 414 } 415 } 416 417 private void seek() throws IOException { 418 if (currentPositionOfEntry != 0) { 419 reader.seek(currentPositionOfEntry); 420 } 421 } 422 423 private long currentTrailerSize() { 424 long size = -1L; 425 if (reader instanceof ProtobufLogReader) { 426 final ProtobufLogReader pblr = (ProtobufLogReader) reader; 427 size = pblr.trailerSize(); 428 } 429 return size; 430 } 431}