001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.Closeable; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.util.NoSuchElementException; 025import java.util.OptionalLong; 026import java.util.concurrent.PriorityBlockingQueue; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileStatus; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.apache.yetus.audience.InterfaceStability; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; 039import org.apache.hadoop.hbase.util.CancelableProgressable; 040import org.apache.hadoop.hbase.util.CommonFSUtils; 041import org.apache.hadoop.hbase.util.FSUtils; 042import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 043import org.apache.hadoop.hbase.wal.WAL.Entry; 044import org.apache.hadoop.hbase.wal.WAL.Reader; 045import org.apache.hadoop.hbase.wal.WALFactory; 046import org.apache.hadoop.ipc.RemoteException; 047 048/** 049 * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually 050 * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it 051 * dequeues it and starts reading from the next. 052 */ 053@InterfaceAudience.Private 054@InterfaceStability.Evolving 055class WALEntryStream implements Closeable { 056 private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class); 057 058 private Reader reader; 059 private Path currentPath; 060 // cache of next entry for hasNext() 061 private Entry currentEntry; 062 // position after reading current entry 063 private long currentPosition = 0; 064 private final PriorityBlockingQueue<Path> logQueue; 065 private final FileSystem fs; 066 private final Configuration conf; 067 private final WALFileLengthProvider walFileLengthProvider; 068 // which region server the WALs belong to 069 private final ServerName serverName; 070 private final MetricsSource metrics; 071 072 /** 073 * Create an entry stream over the given queue at the given start position 074 * @param logQueue the queue of WAL paths 075 * @param conf the {@link Configuration} to use to create {@link Reader} for this stream 076 * @param startPosition the position in the first WAL to start reading at 077 * @param walFileLengthProvider provides the length of the WAL file 078 * @param serverName the server name which all WALs belong to 079 * @param metrics the replication metrics 080 * @throws IOException 081 */ 082 public WALEntryStream(PriorityBlockingQueue<Path> logQueue, Configuration conf, 083 long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, 084 MetricsSource metrics) throws IOException { 085 this.logQueue = logQueue; 086 this.fs = CommonFSUtils.getWALFileSystem(conf); 087 this.conf = conf; 088 this.currentPosition = startPosition; 089 this.walFileLengthProvider = walFileLengthProvider; 090 this.serverName = serverName; 091 this.metrics = metrics; 092 } 093 094 /** 095 * @return true if there is another WAL {@link Entry} 096 */ 097 public boolean hasNext() throws IOException { 098 if (currentEntry == null) { 099 tryAdvanceEntry(); 100 } 101 return currentEntry != null; 102 } 103 104 /** 105 * @return the next WAL entry in this stream 106 * @throws IOException 107 * @throws NoSuchElementException if no more entries in the stream. 108 */ 109 public Entry next() throws IOException { 110 if (!hasNext()) { 111 throw new NoSuchElementException(); 112 } 113 Entry save = currentEntry; 114 currentEntry = null; // gets reloaded by hasNext() 115 return save; 116 } 117 118 /** 119 * {@inheritDoc} 120 */ 121 @Override 122 public void close() throws IOException { 123 closeReader(); 124 } 125 126 /** 127 * @return the position of the last Entry returned by next() 128 */ 129 public long getPosition() { 130 return currentPosition; 131 } 132 133 /** 134 * @return the {@link Path} of the current WAL 135 */ 136 public Path getCurrentPath() { 137 return currentPath; 138 } 139 140 private String getCurrentPathStat() { 141 StringBuilder sb = new StringBuilder(); 142 if (currentPath != null) { 143 sb.append("currently replicating from: ").append(currentPath).append(" at position: ") 144 .append(currentPosition).append("\n"); 145 } else { 146 sb.append("no replication ongoing, waiting for new log"); 147 } 148 return sb.toString(); 149 } 150 151 /** 152 * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned 153 * false) 154 * @throws IOException 155 */ 156 public void reset() throws IOException { 157 if (reader != null && currentPath != null) { 158 resetReader(); 159 } 160 } 161 162 private void setPosition(long position) { 163 currentPosition = position; 164 } 165 166 private void setCurrentPath(Path path) { 167 this.currentPath = path; 168 } 169 170 private void tryAdvanceEntry() throws IOException { 171 if (checkReader()) { 172 boolean beingWritten = readNextEntryAndSetPosition(); 173 if (currentEntry == null && !beingWritten) { 174 // no more entries in this log file, and the file is already closed, i.e, rolled 175 // Before dequeueing, we should always get one more attempt at reading. 176 // This is in case more entries came in after we opened the reader, and the log is rolled 177 // while we were reading. See HBASE-6758 178 resetReader(); 179 readNextEntryAndSetPosition(); 180 if (currentEntry == null) { 181 if (checkAllBytesParsed()) { // now we're certain we're done with this log file 182 dequeueCurrentLog(); 183 if (openNextLog()) { 184 readNextEntryAndSetPosition(); 185 } 186 } 187 } 188 } 189 // if currentEntry != null then just return 190 // if currentEntry == null but the file is still being written, then we should not switch to 191 // the next log either, just return here and try next time to see if there are more entries in 192 // the current file 193 } 194 // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue) 195 } 196 197 // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file 198 private boolean checkAllBytesParsed() throws IOException { 199 // -1 means the wal wasn't closed cleanly. 200 final long trailerSize = currentTrailerSize(); 201 FileStatus stat = null; 202 try { 203 stat = fs.getFileStatus(this.currentPath); 204 } catch (IOException exception) { 205 LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it " 206 + (trailerSize < 0 ? "was not" : "was") + " closed cleanly " + getCurrentPathStat()); 207 metrics.incrUnknownFileLengthForClosedWAL(); 208 } 209 if (stat != null) { 210 if (trailerSize < 0) { 211 if (currentPosition < stat.getLen()) { 212 final long skippedBytes = stat.getLen() - currentPosition; 213 if (LOG.isDebugEnabled()) { 214 LOG.debug("Reached the end of WAL file '" + currentPath 215 + "'. It was not closed cleanly, so we did not parse " + skippedBytes 216 + " bytes of data. This is normally ok."); 217 } 218 metrics.incrUncleanlyClosedWALs(); 219 metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); 220 } 221 } else if (currentPosition + trailerSize < stat.getLen()) { 222 LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition 223 + ", which is too far away from reported file length " + stat.getLen() 224 + ". Restarting WAL reading (see HBASE-15983 for details). " + getCurrentPathStat()); 225 setPosition(0); 226 resetReader(); 227 metrics.incrRestartedWALReading(); 228 metrics.incrRepeatedFileBytes(currentPosition); 229 return false; 230 } 231 } 232 if (LOG.isTraceEnabled()) { 233 LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " 234 + (stat == null ? "N/A" : stat.getLen())); 235 } 236 metrics.incrCompletedWAL(); 237 return true; 238 } 239 240 private void dequeueCurrentLog() throws IOException { 241 if (LOG.isDebugEnabled()) { 242 LOG.debug("Reached the end of log " + currentPath); 243 } 244 closeReader(); 245 logQueue.remove(); 246 setPosition(0); 247 metrics.decrSizeOfLogQueue(); 248 } 249 250 /** 251 * Returns whether the file is opened for writing. 252 */ 253 private boolean readNextEntryAndSetPosition() throws IOException { 254 Entry readEntry = reader.next(); 255 long readerPos = reader.getPosition(); 256 OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); 257 if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { 258 // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted 259 // data, so we need to make sure that we do not read beyond the committed file length. 260 if (LOG.isDebugEnabled()) { 261 LOG.debug("The provider tells us the valid length for " + currentPath + " is " + 262 fileLength.getAsLong() + ", but we have advanced to " + readerPos); 263 } 264 resetReader(); 265 return true; 266 } 267 if (readEntry != null) { 268 metrics.incrLogEditsRead(); 269 metrics.incrLogReadInBytes(readerPos - currentPosition); 270 } 271 currentEntry = readEntry; // could be null 272 setPosition(readerPos); 273 return fileLength.isPresent(); 274 } 275 276 private void closeReader() throws IOException { 277 if (reader != null) { 278 reader.close(); 279 reader = null; 280 } 281 } 282 283 // if we don't have a reader, open a reader on the next log 284 private boolean checkReader() throws IOException { 285 if (reader == null) { 286 return openNextLog(); 287 } 288 return true; 289 } 290 291 // open a reader on the next log in queue 292 private boolean openNextLog() throws IOException { 293 Path nextPath = logQueue.peek(); 294 if (nextPath != null) { 295 openReader(nextPath); 296 if (reader != null) { 297 return true; 298 } 299 } 300 return false; 301 } 302 303 private Path getArchivedLog(Path path) throws IOException { 304 Path walRootDir = CommonFSUtils.getWALRootDir(conf); 305 306 // Try found the log in old dir 307 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 308 Path archivedLogLocation = new Path(oldLogDir, path.getName()); 309 if (fs.exists(archivedLogLocation)) { 310 LOG.info("Log " + path + " was moved to " + archivedLogLocation); 311 return archivedLogLocation; 312 } 313 314 // Try found the log in the seperate old log dir 315 oldLogDir = 316 new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) 317 .append(Path.SEPARATOR).append(serverName.getServerName()).toString()); 318 archivedLogLocation = new Path(oldLogDir, path.getName()); 319 if (fs.exists(archivedLogLocation)) { 320 LOG.info("Log " + path + " was moved to " + archivedLogLocation); 321 return archivedLogLocation; 322 } 323 324 LOG.error("Couldn't locate log: " + path); 325 return path; 326 } 327 328 private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException { 329 // If the log was archived, continue reading from there 330 Path archivedLog = getArchivedLog(path); 331 if (!path.equals(archivedLog)) { 332 openReader(archivedLog); 333 } else { 334 throw fnfe; 335 } 336 } 337 338 private void openReader(Path path) throws IOException { 339 try { 340 // Detect if this is a new file, if so get a new reader else 341 // reset the current reader so that we see the new data 342 if (reader == null || !getCurrentPath().equals(path)) { 343 closeReader(); 344 reader = WALFactory.createReader(fs, path, conf); 345 seek(); 346 setCurrentPath(path); 347 } else { 348 resetReader(); 349 } 350 } catch (FileNotFoundException fnfe) { 351 handleFileNotFound(path, fnfe); 352 } catch (RemoteException re) { 353 IOException ioe = re.unwrapRemoteException(FileNotFoundException.class); 354 if (!(ioe instanceof FileNotFoundException)) throw ioe; 355 handleFileNotFound(path, (FileNotFoundException)ioe); 356 } catch (LeaseNotRecoveredException lnre) { 357 // HBASE-15019 the WAL was not closed due to some hiccup. 358 LOG.warn("Try to recover the WAL lease " + currentPath, lnre); 359 recoverLease(conf, currentPath); 360 reader = null; 361 } catch (NullPointerException npe) { 362 // Workaround for race condition in HDFS-4380 363 // which throws a NPE if we open a file before any data node has the most recent block 364 // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. 365 LOG.warn("Got NPE opening reader, will retry."); 366 reader = null; 367 } 368 } 369 370 // For HBASE-15019 371 private void recoverLease(final Configuration conf, final Path path) { 372 try { 373 374 final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf); 375 FSUtils fsUtils = FSUtils.getInstance(dfs, conf); 376 fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { 377 @Override 378 public boolean progress() { 379 LOG.debug("recover WAL lease: " + path); 380 return true; 381 } 382 }); 383 } catch (IOException e) { 384 LOG.warn("unable to recover lease for WAL: " + path, e); 385 } 386 } 387 388 private void resetReader() throws IOException { 389 try { 390 reader.reset(); 391 seek(); 392 } catch (FileNotFoundException fnfe) { 393 // If the log was archived, continue reading from there 394 Path archivedLog = getArchivedLog(currentPath); 395 if (!currentPath.equals(archivedLog)) { 396 openReader(archivedLog); 397 } else { 398 throw fnfe; 399 } 400 } catch (NullPointerException npe) { 401 throw new IOException("NPE resetting reader, likely HDFS-4380", npe); 402 } 403 } 404 405 private void seek() throws IOException { 406 if (currentPosition != 0) { 407 reader.seek(currentPosition); 408 } 409 } 410 411 private long currentTrailerSize() { 412 long size = -1L; 413 if (reader instanceof ProtobufLogReader) { 414 final ProtobufLogReader pblr = (ProtobufLogReader) reader; 415 size = pblr.trailerSize(); 416 } 417 return size; 418 } 419}