001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.Closeable; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.util.OptionalLong; 025import java.util.concurrent.PriorityBlockingQueue; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileStatus; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.ServerName; 032import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; 033import org.apache.hadoop.hbase.util.CancelableProgressable; 034import org.apache.hadoop.hbase.util.CommonFSUtils; 035import org.apache.hadoop.hbase.util.FSUtils; 036import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 037import org.apache.hadoop.hbase.wal.WAL.Entry; 038import org.apache.hadoop.hbase.wal.WAL.Reader; 039import org.apache.hadoop.hbase.wal.WALFactory; 040import org.apache.hadoop.ipc.RemoteException; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.yetus.audience.InterfaceStability; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually 048 * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it 049 * dequeues it and starts reading from the next. 050 */ 051@InterfaceAudience.Private 052@InterfaceStability.Evolving 053class WALEntryStream implements Closeable { 054 private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class); 055 056 private Reader reader; 057 private Path currentPath; 058 // cache of next entry for hasNext() 059 private Entry currentEntry; 060 // position for the current entry. As now we support peek, which means that the upper layer may 061 // choose to return before reading the current entry, so it is not safe to return the value below 062 // in getPosition. 063 private long currentPositionOfEntry = 0; 064 // position after reading current entry 065 private long currentPositionOfReader = 0; 066 private final PriorityBlockingQueue<Path> logQueue; 067 private final FileSystem fs; 068 private final Configuration conf; 069 private final WALFileLengthProvider walFileLengthProvider; 070 // which region server the WALs belong to 071 private final ServerName serverName; 072 private final MetricsSource metrics; 073 074 /** 075 * Create an entry stream over the given queue at the given start position 076 * @param logQueue the queue of WAL paths 077 * @param conf the {@link Configuration} to use to create {@link Reader} for this stream 078 * @param startPosition the position in the first WAL to start reading at 079 * @param walFileLengthProvider provides the length of the WAL file 080 * @param serverName the server name which all WALs belong to 081 * @param metrics the replication metrics 082 * @throws IOException 083 */ 084 public WALEntryStream(PriorityBlockingQueue<Path> logQueue, Configuration conf, 085 long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, 086 MetricsSource metrics) throws IOException { 087 this.logQueue = logQueue; 088 this.fs = CommonFSUtils.getWALFileSystem(conf); 089 this.conf = conf; 090 this.currentPositionOfEntry = startPosition; 091 this.walFileLengthProvider = walFileLengthProvider; 092 this.serverName = serverName; 093 this.metrics = metrics; 094 } 095 096 /** 097 * @return true if there is another WAL {@link Entry} 098 */ 099 public boolean hasNext() throws IOException { 100 if (currentEntry == null) { 101 tryAdvanceEntry(); 102 } 103 return currentEntry != null; 104 } 105 106 /** 107 * Returns the next WAL entry in this stream but does not advance. 108 */ 109 public Entry peek() throws IOException { 110 return hasNext() ? currentEntry: null; 111 } 112 113 /** 114 * Returns the next WAL entry in this stream and advance the stream. 115 */ 116 public Entry next() throws IOException { 117 Entry save = peek(); 118 currentPositionOfEntry = currentPositionOfReader; 119 currentEntry = null; 120 return save; 121 } 122 123 /** 124 * {@inheritDoc} 125 */ 126 @Override 127 public void close() throws IOException { 128 closeReader(); 129 } 130 131 /** 132 * @return the position of the last Entry returned by next() 133 */ 134 public long getPosition() { 135 return currentPositionOfEntry; 136 } 137 138 /** 139 * @return the {@link Path} of the current WAL 140 */ 141 public Path getCurrentPath() { 142 return currentPath; 143 } 144 145 private String getCurrentPathStat() { 146 StringBuilder sb = new StringBuilder(); 147 if (currentPath != null) { 148 sb.append("currently replicating from: ").append(currentPath).append(" at position: ") 149 .append(currentPositionOfEntry).append("\n"); 150 } else { 151 sb.append("no replication ongoing, waiting for new log"); 152 } 153 return sb.toString(); 154 } 155 156 /** 157 * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned 158 * false) 159 */ 160 public void reset() throws IOException { 161 if (reader != null && currentPath != null) { 162 resetReader(); 163 } 164 } 165 166 private void setPosition(long position) { 167 currentPositionOfEntry = position; 168 } 169 170 private void setCurrentPath(Path path) { 171 this.currentPath = path; 172 } 173 174 private void tryAdvanceEntry() throws IOException { 175 if (checkReader()) { 176 boolean beingWritten = readNextEntryAndRecordReaderPosition(); 177 if (currentEntry == null && !beingWritten) { 178 // no more entries in this log file, and the file is already closed, i.e, rolled 179 // Before dequeueing, we should always get one more attempt at reading. 180 // This is in case more entries came in after we opened the reader, and the log is rolled 181 // while we were reading. See HBASE-6758 182 resetReader(); 183 readNextEntryAndRecordReaderPosition(); 184 if (currentEntry == null) { 185 if (checkAllBytesParsed()) { // now we're certain we're done with this log file 186 dequeueCurrentLog(); 187 if (openNextLog()) { 188 readNextEntryAndRecordReaderPosition(); 189 } 190 } 191 } 192 } 193 // if currentEntry != null then just return 194 // if currentEntry == null but the file is still being written, then we should not switch to 195 // the next log either, just return here and try next time to see if there are more entries in 196 // the current file 197 } 198 // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue) 199 } 200 201 // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file 202 private boolean checkAllBytesParsed() throws IOException { 203 // -1 means the wal wasn't closed cleanly. 204 final long trailerSize = currentTrailerSize(); 205 FileStatus stat = null; 206 try { 207 stat = fs.getFileStatus(this.currentPath); 208 } catch (IOException exception) { 209 LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", 210 currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat()); 211 metrics.incrUnknownFileLengthForClosedWAL(); 212 } 213 // Here we use currentPositionOfReader instead of currentPositionOfEntry. 214 // We only call this method when currentEntry is null so usually they are the same, but there 215 // are two exceptions. One is we have nothing in the file but only a header, in this way 216 // the currentPositionOfEntry will always be 0 since we have no change to update it. The other 217 // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the 218 // last valid entry, and the currentPositionOfReader will usually point to the end of the file. 219 if (stat != null) { 220 if (trailerSize < 0) { 221 if (currentPositionOfReader < stat.getLen()) { 222 final long skippedBytes = stat.getLen() - currentPositionOfReader; 223 LOG.debug( 224 "Reached the end of WAL file '{}'. It was not closed cleanly," + 225 " so we did not parse {} bytes of data. This is normally ok.", 226 currentPath, skippedBytes); 227 metrics.incrUncleanlyClosedWALs(); 228 metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); 229 } 230 } else if (currentPositionOfReader + trailerSize < stat.getLen()) { 231 LOG.warn( 232 "Processing end of WAL file '{}'. At position {}, which is too far away from" + 233 " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", 234 currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat()); 235 setPosition(0); 236 resetReader(); 237 metrics.incrRestartedWALReading(); 238 metrics.incrRepeatedFileBytes(currentPositionOfReader); 239 return false; 240 } 241 } 242 if (LOG.isTraceEnabled()) { 243 LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " + 244 (stat == null ? "N/A" : stat.getLen())); 245 } 246 metrics.incrCompletedWAL(); 247 return true; 248 } 249 250 private void dequeueCurrentLog() throws IOException { 251 LOG.debug("Reached the end of log {}", currentPath); 252 closeReader(); 253 logQueue.remove(); 254 setPosition(0); 255 metrics.decrSizeOfLogQueue(); 256 } 257 258 /** 259 * Returns whether the file is opened for writing. 260 */ 261 private boolean readNextEntryAndRecordReaderPosition() throws IOException { 262 Entry readEntry = reader.next(); 263 long readerPos = reader.getPosition(); 264 OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); 265 if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { 266 // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted 267 // data, so we need to make sure that we do not read beyond the committed file length. 268 if (LOG.isDebugEnabled()) { 269 LOG.debug("The provider tells us the valid length for " + currentPath + " is " + 270 fileLength.getAsLong() + ", but we have advanced to " + readerPos); 271 } 272 resetReader(); 273 return true; 274 } 275 if (readEntry != null) { 276 metrics.incrLogEditsRead(); 277 metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); 278 } 279 currentEntry = readEntry; // could be null 280 this.currentPositionOfReader = readerPos; 281 return fileLength.isPresent(); 282 } 283 284 private void closeReader() throws IOException { 285 if (reader != null) { 286 reader.close(); 287 reader = null; 288 } 289 } 290 291 // if we don't have a reader, open a reader on the next log 292 private boolean checkReader() throws IOException { 293 if (reader == null) { 294 return openNextLog(); 295 } 296 return true; 297 } 298 299 // open a reader on the next log in queue 300 private boolean openNextLog() throws IOException { 301 Path nextPath = logQueue.peek(); 302 if (nextPath != null) { 303 openReader(nextPath); 304 if (reader != null) { 305 return true; 306 } 307 } else { 308 // no more files in queue, this could only happen for recovered queue. 309 setCurrentPath(null); 310 } 311 return false; 312 } 313 314 private Path getArchivedLog(Path path) throws IOException { 315 Path walRootDir = CommonFSUtils.getWALRootDir(conf); 316 317 // Try found the log in old dir 318 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 319 Path archivedLogLocation = new Path(oldLogDir, path.getName()); 320 if (fs.exists(archivedLogLocation)) { 321 LOG.info("Log " + path + " was moved to " + archivedLogLocation); 322 return archivedLogLocation; 323 } 324 325 // Try found the log in the seperate old log dir 326 oldLogDir = 327 new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) 328 .append(Path.SEPARATOR).append(serverName.getServerName()).toString()); 329 archivedLogLocation = new Path(oldLogDir, path.getName()); 330 if (fs.exists(archivedLogLocation)) { 331 LOG.info("Log " + path + " was moved to " + archivedLogLocation); 332 return archivedLogLocation; 333 } 334 335 LOG.error("Couldn't locate log: " + path); 336 return path; 337 } 338 339 private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException { 340 // If the log was archived, continue reading from there 341 Path archivedLog = getArchivedLog(path); 342 if (!path.equals(archivedLog)) { 343 openReader(archivedLog); 344 } else { 345 throw fnfe; 346 } 347 } 348 349 private void openReader(Path path) throws IOException { 350 try { 351 // Detect if this is a new file, if so get a new reader else 352 // reset the current reader so that we see the new data 353 if (reader == null || !getCurrentPath().equals(path)) { 354 closeReader(); 355 reader = WALFactory.createReader(fs, path, conf); 356 seek(); 357 setCurrentPath(path); 358 } else { 359 resetReader(); 360 } 361 } catch (FileNotFoundException fnfe) { 362 handleFileNotFound(path, fnfe); 363 } catch (RemoteException re) { 364 IOException ioe = re.unwrapRemoteException(FileNotFoundException.class); 365 if (!(ioe instanceof FileNotFoundException)) throw ioe; 366 handleFileNotFound(path, (FileNotFoundException)ioe); 367 } catch (LeaseNotRecoveredException lnre) { 368 // HBASE-15019 the WAL was not closed due to some hiccup. 369 LOG.warn("Try to recover the WAL lease " + currentPath, lnre); 370 recoverLease(conf, currentPath); 371 reader = null; 372 } catch (NullPointerException npe) { 373 // Workaround for race condition in HDFS-4380 374 // which throws a NPE if we open a file before any data node has the most recent block 375 // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. 376 LOG.warn("Got NPE opening reader, will retry."); 377 reader = null; 378 } 379 } 380 381 // For HBASE-15019 382 private void recoverLease(final Configuration conf, final Path path) { 383 try { 384 385 final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf); 386 FSUtils fsUtils = FSUtils.getInstance(dfs, conf); 387 fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { 388 @Override 389 public boolean progress() { 390 LOG.debug("recover WAL lease: " + path); 391 return true; 392 } 393 }); 394 } catch (IOException e) { 395 LOG.warn("unable to recover lease for WAL: " + path, e); 396 } 397 } 398 399 private void resetReader() throws IOException { 400 try { 401 currentEntry = null; 402 reader.reset(); 403 seek(); 404 } catch (FileNotFoundException fnfe) { 405 // If the log was archived, continue reading from there 406 Path archivedLog = getArchivedLog(currentPath); 407 if (!currentPath.equals(archivedLog)) { 408 openReader(archivedLog); 409 } else { 410 throw fnfe; 411 } 412 } catch (NullPointerException npe) { 413 throw new IOException("NPE resetting reader, likely HDFS-4380", npe); 414 } 415 } 416 417 private void seek() throws IOException { 418 if (currentPositionOfEntry != 0) { 419 reader.seek(currentPositionOfEntry); 420 } 421 } 422 423 private long currentTrailerSize() { 424 long size = -1L; 425 if (reader instanceof ProtobufLogReader) { 426 final ProtobufLogReader pblr = (ProtobufLogReader) reader; 427 size = pblr.trailerSize(); 428 } 429 return size; 430 } 431}