001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.Closeable; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.util.OptionalLong; 024import java.util.concurrent.PriorityBlockingQueue; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileStatus; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.ServerName; 030import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; 031import org.apache.hadoop.hbase.util.CancelableProgressable; 032import org.apache.hadoop.hbase.util.CommonFSUtils; 033import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 034import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 035import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 036import org.apache.hadoop.hbase.wal.WAL.Entry; 037import org.apache.hadoop.hbase.wal.WAL.Reader; 038import org.apache.hadoop.hbase.wal.WALFactory; 039import org.apache.hadoop.ipc.RemoteException; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.apache.yetus.audience.InterfaceStability; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually 047 * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it 048 * dequeues it and starts reading from the next. 049 */ 050@InterfaceAudience.Private 051@InterfaceStability.Evolving 052class WALEntryStream implements Closeable { 053 private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class); 054 055 private Reader reader; 056 private Path currentPath; 057 // cache of next entry for hasNext() 058 private Entry currentEntry; 059 // position for the current entry. As now we support peek, which means that the upper layer may 060 // choose to return before reading the current entry, so it is not safe to return the value below 061 // in getPosition. 062 private long currentPositionOfEntry = 0; 063 // position after reading current entry 064 private long currentPositionOfReader = 0; 065 private final ReplicationSourceLogQueue logQueue; 066 private final String walGroupId; 067 private final FileSystem fs; 068 private final Configuration conf; 069 private final WALFileLengthProvider walFileLengthProvider; 070 // which region server the WALs belong to 071 private final ServerName serverName; 072 private final MetricsSource metrics; 073 074 /** 075 * Create an entry stream over the given queue at the given start position 076 * @param logQueue the queue of WAL paths 077 * @param conf the {@link Configuration} to use to create {@link Reader} for this 078 * stream 079 * @param startPosition the position in the first WAL to start reading at 080 * @param walFileLengthProvider provides the length of the WAL file 081 * @param serverName the server name which all WALs belong to 082 * @param metrics the replication metrics 083 * @throws IOException throw IO exception from stream 084 */ 085 public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf, long startPosition, 086 WALFileLengthProvider walFileLengthProvider, ServerName serverName, MetricsSource metrics, 087 String walGroupId) throws IOException { 088 this.logQueue = logQueue; 089 this.fs = CommonFSUtils.getWALFileSystem(conf); 090 this.conf = conf; 091 this.currentPositionOfEntry = startPosition; 092 this.walFileLengthProvider = walFileLengthProvider; 093 this.serverName = serverName; 094 this.metrics = metrics; 095 this.walGroupId = walGroupId; 096 } 097 098 /** Returns true if there is another WAL {@link Entry} */ 099 public boolean hasNext() throws IOException { 100 if (currentEntry == null) { 101 tryAdvanceEntry(); 102 } 103 return currentEntry != null; 104 } 105 106 /** 107 * Returns the next WAL entry in this stream but does not advance. 108 */ 109 public Entry peek() throws IOException { 110 return hasNext() ? currentEntry : null; 111 } 112 113 /** 114 * Returns the next WAL entry in this stream and advance the stream. 115 */ 116 public Entry next() throws IOException { 117 Entry save = peek(); 118 currentPositionOfEntry = currentPositionOfReader; 119 currentEntry = null; 120 return save; 121 } 122 123 /** 124 * {@inheritDoc} 125 */ 126 @Override 127 public void close() throws IOException { 128 closeReader(); 129 } 130 131 /** Returns the position of the last Entry returned by next() */ 132 public long getPosition() { 133 return currentPositionOfEntry; 134 } 135 136 /** Returns the {@link Path} of the current WAL */ 137 public Path getCurrentPath() { 138 return currentPath; 139 } 140 141 private String getCurrentPathStat() { 142 StringBuilder sb = new StringBuilder(); 143 if (currentPath != null) { 144 sb.append("currently replicating from: ").append(currentPath).append(" at position: ") 145 .append(currentPositionOfEntry).append("\n"); 146 } else { 147 sb.append("no replication ongoing, waiting for new log"); 148 } 149 return sb.toString(); 150 } 151 152 /** 153 * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned 154 * false) 155 */ 156 public void reset() throws IOException { 157 if (reader != null && currentPath != null) { 158 resetReader(); 159 } 160 } 161 162 private void setPosition(long position) { 163 currentPositionOfEntry = position; 164 } 165 166 private void setCurrentPath(Path path) { 167 this.currentPath = path; 168 } 169 170 private void tryAdvanceEntry() throws IOException { 171 if (checkReader()) { 172 boolean beingWritten = readNextEntryAndRecordReaderPosition(); 173 LOG.trace("Reading WAL {}; currently open for write={}", this.currentPath, beingWritten); 174 if (currentEntry == null && !beingWritten) { 175 // no more entries in this log file, and the file is already closed, i.e, rolled 176 // Before dequeueing, we should always get one more attempt at reading. 177 // This is in case more entries came in after we opened the reader, and the log is rolled 178 // while we were reading. See HBASE-6758 179 resetReader(); 180 readNextEntryAndRecordReaderPosition(); 181 if (currentEntry == null) { 182 if (checkAllBytesParsed()) { // now we're certain we're done with this log file 183 dequeueCurrentLog(); 184 if (openNextLog()) { 185 readNextEntryAndRecordReaderPosition(); 186 } 187 } 188 } 189 } 190 // if currentEntry != null then just return 191 // if currentEntry == null but the file is still being written, then we should not switch to 192 // the next log either, just return here and try next time to see if there are more entries in 193 // the current file 194 } 195 // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue) 196 } 197 198 // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file 199 private boolean checkAllBytesParsed() throws IOException { 200 // -1 means the wal wasn't closed cleanly. 201 final long trailerSize = currentTrailerSize(); 202 FileStatus stat = null; 203 try { 204 stat = fs.getFileStatus(this.currentPath); 205 } catch (IOException exception) { 206 LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", 207 currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat()); 208 metrics.incrUnknownFileLengthForClosedWAL(); 209 } 210 // Here we use currentPositionOfReader instead of currentPositionOfEntry. 211 // We only call this method when currentEntry is null so usually they are the same, but there 212 // are two exceptions. One is we have nothing in the file but only a header, in this way 213 // the currentPositionOfEntry will always be 0 since we have no change to update it. The other 214 // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the 215 // last valid entry, and the currentPositionOfReader will usually point to the end of the file. 216 if (stat != null) { 217 if (trailerSize < 0) { 218 if (currentPositionOfReader < stat.getLen()) { 219 final long skippedBytes = stat.getLen() - currentPositionOfReader; 220 // See the commits in HBASE-25924/HBASE-25932 for context. 221 LOG.warn("Reached the end of WAL {}. It was not closed cleanly," 222 + " so we did not parse {} bytes of data.", currentPath, skippedBytes); 223 metrics.incrUncleanlyClosedWALs(); 224 metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); 225 } 226 } else if (currentPositionOfReader + trailerSize < stat.getLen()) { 227 LOG.warn( 228 "Processing end of WAL {} at position {}, which is too far away from" 229 + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", 230 currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat()); 231 setPosition(0); 232 resetReader(); 233 metrics.incrRestartedWALReading(); 234 metrics.incrRepeatedFileBytes(currentPositionOfReader); 235 return false; 236 } 237 } 238 if (LOG.isTraceEnabled()) { 239 LOG.trace("Reached the end of " + this.currentPath + " and length of the file is " 240 + (stat == null ? "N/A" : stat.getLen())); 241 } 242 metrics.incrCompletedWAL(); 243 return true; 244 } 245 246 private void dequeueCurrentLog() throws IOException { 247 LOG.debug("EOF, closing {}", currentPath); 248 closeReader(); 249 logQueue.remove(walGroupId); 250 setCurrentPath(null); 251 setPosition(0); 252 } 253 254 /** 255 * Returns whether the file is opened for writing. 256 */ 257 private boolean readNextEntryAndRecordReaderPosition() throws IOException { 258 Entry readEntry = reader.next(); 259 long readerPos = reader.getPosition(); 260 OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); 261 if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { 262 // See HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted 263 // data, so we need to make sure that we do not read beyond the committed file length. 264 if (LOG.isDebugEnabled()) { 265 LOG.debug("The provider tells us the valid length for " + currentPath + " is " 266 + fileLength.getAsLong() + ", but we have advanced to " + readerPos); 267 } 268 resetReader(); 269 return true; 270 } 271 if (readEntry != null) { 272 LOG.trace("reading entry: {} ", readEntry); 273 metrics.incrLogEditsRead(); 274 metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); 275 } 276 currentEntry = readEntry; // could be null 277 this.currentPositionOfReader = readerPos; 278 return fileLength.isPresent(); 279 } 280 281 private void closeReader() throws IOException { 282 if (reader != null) { 283 reader.close(); 284 reader = null; 285 } 286 } 287 288 // if we don't have a reader, open a reader on the next log 289 private boolean checkReader() throws IOException { 290 if (reader == null) { 291 return openNextLog(); 292 } 293 return true; 294 } 295 296 // open a reader on the next log in queue 297 private boolean openNextLog() throws IOException { 298 PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId); 299 Path nextPath = queue.peek(); 300 if (nextPath != null) { 301 openReader(nextPath); 302 if (reader != null) { 303 return true; 304 } 305 } else { 306 // no more files in queue, this could only happen for recovered queue. 307 setCurrentPath(null); 308 } 309 return false; 310 } 311 312 private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException { 313 // If the log was archived, continue reading from there 314 Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf); 315 // archivedLog can be null if unable to locate in archiveDir. 316 if (archivedLog != null) { 317 openReader(archivedLog); 318 } else { 319 throw fnfe; 320 } 321 } 322 323 private void openReader(Path path) throws IOException { 324 try { 325 // Detect if this is a new file, if so get a new reader else 326 // reset the current reader so that we see the new data 327 if (reader == null || !getCurrentPath().equals(path)) { 328 closeReader(); 329 reader = WALFactory.createReader(fs, path, conf); 330 seek(); 331 setCurrentPath(path); 332 } else { 333 resetReader(); 334 } 335 } catch (FileNotFoundException fnfe) { 336 handleFileNotFound(path, fnfe); 337 } catch (RemoteException re) { 338 IOException ioe = re.unwrapRemoteException(FileNotFoundException.class); 339 if (!(ioe instanceof FileNotFoundException)) { 340 throw ioe; 341 } 342 handleFileNotFound(path, (FileNotFoundException) ioe); 343 } catch (LeaseNotRecoveredException lnre) { 344 // HBASE-15019 the WAL was not closed due to some hiccup. 345 LOG.warn("Try to recover the WAL lease " + path, lnre); 346 recoverLease(conf, path); 347 reader = null; 348 } catch (NullPointerException npe) { 349 // Workaround for race condition in HDFS-4380 350 // which throws a NPE if we open a file before any data node has the most recent block 351 // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. 352 LOG.warn("Got NPE opening reader, will retry."); 353 reader = null; 354 } 355 } 356 357 // For HBASE-15019 358 private void recoverLease(final Configuration conf, final Path path) { 359 try { 360 final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf); 361 RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { 362 @Override 363 public boolean progress() { 364 LOG.debug("recover WAL lease: " + path); 365 return true; 366 } 367 }); 368 } catch (IOException e) { 369 LOG.warn("unable to recover lease for WAL: " + path, e); 370 } 371 } 372 373 private void resetReader() throws IOException { 374 try { 375 currentEntry = null; 376 reader.reset(); 377 seek(); 378 } catch (FileNotFoundException fnfe) { 379 // If the log was archived, continue reading from there 380 Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf); 381 // archivedLog can be null if unable to locate in archiveDir. 382 if (archivedLog != null) { 383 openReader(archivedLog); 384 } else { 385 throw fnfe; 386 } 387 } catch (NullPointerException npe) { 388 throw new IOException("NPE resetting reader, likely HDFS-4380", npe); 389 } 390 } 391 392 private void seek() throws IOException { 393 if (currentPositionOfEntry != 0) { 394 reader.seek(currentPositionOfEntry); 395 } 396 } 397 398 private long currentTrailerSize() { 399 long size = -1L; 400 if (reader instanceof ProtobufLogReader) { 401 final ProtobufLogReader pblr = (ProtobufLogReader) reader; 402 size = pblr.trailerSize(); 403 } 404 return size; 405 } 406}