001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.Closeable; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.util.OptionalLong; 024import java.util.concurrent.PriorityBlockingQueue; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileStatus; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader; 030import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException; 031import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 032import org.apache.hadoop.hbase.util.Pair; 033import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 034import org.apache.hadoop.hbase.wal.WAL.Entry; 035import org.apache.hadoop.hbase.wal.WALFactory; 036import org.apache.hadoop.hbase.wal.WALStreamReader; 037import org.apache.hadoop.hbase.wal.WALTailingReader; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.apache.yetus.audience.InterfaceStability; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually 045 * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it 046 * dequeues it and starts reading from the next. 047 */ 048@InterfaceAudience.Private 049@InterfaceStability.Evolving 050class WALEntryStream implements Closeable { 051 private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class); 052 053 private WALTailingReader reader; 054 private WALTailingReader.State state; 055 private Path currentPath; 056 // cache of next entry for hasNext() 057 private Entry currentEntry; 058 // position for the current entry. As now we support peek, which means that the upper layer may 059 // choose to return before reading the current entry, so it is not safe to return the value below 060 // in getPosition. 061 private long currentPositionOfEntry = 0; 062 // position after reading current entry 063 private long currentPositionOfReader = 0; 064 private final ReplicationSourceLogQueue logQueue; 065 private final String walGroupId; 066 private final FileSystem fs; 067 private final Configuration conf; 068 private final WALFileLengthProvider walFileLengthProvider; 069 private final MetricsSource metrics; 070 071 // we should be able to skip empty WAL files, but for safety, we still provide this config 072 // see HBASE-18137 for more details 073 private boolean eofAutoRecovery; 074 075 /** 076 * Create an entry stream over the given queue at the given start position 077 * @param logQueue the queue of WAL paths 078 * @param conf the {@link Configuration} to use to create {@link WALStreamReader} 079 * for this stream 080 * @param startPosition the position in the first WAL to start reading at 081 * @param walFileLengthProvider provides the length of the WAL file 082 * @param serverName the server name which all WALs belong to 083 * @param metrics the replication metrics 084 */ 085 public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf, 086 long startPosition, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics, 087 String walGroupId) { 088 this.logQueue = logQueue; 089 this.fs = fs; 090 this.conf = conf; 091 this.currentPositionOfEntry = startPosition; 092 this.walFileLengthProvider = walFileLengthProvider; 093 this.metrics = metrics; 094 this.walGroupId = walGroupId; 095 this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); 096 } 097 098 public enum HasNext { 099 /** means there is a new entry and you could use peek or next to get current entry */ 100 YES, 101 /** 102 * means there are something wrong or we have reached EOF of the current file but it is not 103 * closed yet and there is no new file in the replication queue yet, you should sleep a while 104 * and try to call hasNext again 105 */ 106 RETRY, 107 /** 108 * Usually this means we have finished reading a WAL file, and for simplify the implementation 109 * of this class, we just let the upper layer issue a new hasNext call again to open the next 110 * WAL file. 111 */ 112 RETRY_IMMEDIATELY, 113 /** 114 * means there is no new entry and stream is end, the upper layer should close this stream and 115 * release other resources as well 116 */ 117 NO 118 } 119 120 /** 121 * Try advance the stream if there is no entry yet. See the javadoc for {@link HasNext} for more 122 * details about the meanings of the return values. 123 * <p/> 124 * You can call {@link #peek()} or {@link #next()} to get the actual {@link Entry} if this method 125 * returns {@link HasNext#YES}. 126 */ 127 public HasNext hasNext() { 128 if (currentEntry == null) { 129 return tryAdvanceEntry(); 130 } else { 131 return HasNext.YES; 132 } 133 } 134 135 /** 136 * Returns the next WAL entry in this stream but does not advance. 137 * <p/> 138 * Must call {@link #hasNext()} first before calling this method, and if you have already called 139 * {@link #next()} to consume the current entry, you need to call {@link #hasNext()} again to 140 * advance the stream before calling this method again, otherwise it will always return 141 * {@code null} 142 * <p/> 143 * The reason here is that, we need to use the return value of {@link #hasNext()} to tell upper 144 * layer to retry or not, so we can not wrap the {@link #hasNext()} call inside {@link #peek()} or 145 * {@link #next()} as they have their own return value. 146 * @see #hasNext() 147 * @see #next() 148 */ 149 public Entry peek() { 150 return currentEntry; 151 } 152 153 /** 154 * Returns the next WAL entry in this stream and advance the stream. Will throw 155 * {@link IllegalStateException} if you do not call {@link #hasNext()} before calling this method. 156 * Please see the javadoc of {@link #peek()} method to see why we need this. 157 * @throws IllegalStateException Every time you want to call this method, please call 158 * {@link #hasNext()} first, otherwise a 159 * {@link IllegalStateException} will be thrown. 160 * @see #hasNext() 161 * @see #peek() 162 */ 163 public Entry next() { 164 if (currentEntry == null) { 165 throw new IllegalStateException("Call hasNext first"); 166 } 167 Entry save = peek(); 168 currentPositionOfEntry = currentPositionOfReader; 169 currentEntry = null; 170 state = null; 171 return save; 172 } 173 174 /** 175 * {@inheritDoc} 176 */ 177 @Override 178 public void close() { 179 closeReader(); 180 } 181 182 /** Returns the position of the last Entry returned by next() */ 183 public long getPosition() { 184 return currentPositionOfEntry; 185 } 186 187 /** Returns the {@link Path} of the current WAL */ 188 public Path getCurrentPath() { 189 return currentPath; 190 } 191 192 private String getCurrentPathStat() { 193 StringBuilder sb = new StringBuilder(); 194 if (currentPath != null) { 195 sb.append("currently replicating from: ").append(currentPath).append(" at position: ") 196 .append(currentPositionOfEntry).append("\n"); 197 } else { 198 sb.append("no replication ongoing, waiting for new log"); 199 } 200 return sb.toString(); 201 } 202 203 private void setCurrentPath(Path path) { 204 this.currentPath = path; 205 } 206 207 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION", 208 justification = "HDFS-4380") 209 private HasNext prepareReader() { 210 if (reader != null) { 211 if (state != null && state != WALTailingReader.State.NORMAL) { 212 // reset before reading 213 LOG.debug("Reset reader {} to pos {}, reset compression={}", currentPath, 214 currentPositionOfEntry, state.resetCompression()); 215 try { 216 if (currentPositionOfEntry > 0) { 217 reader.resetTo(currentPositionOfEntry, state.resetCompression()); 218 } else { 219 // we will read from the beginning so we should always clear the compression context 220 reader.resetTo(-1, true); 221 } 222 } catch (IOException e) { 223 LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath, 224 currentPositionOfEntry, state.resetCompression(), e); 225 // just leave the state as is, and try resetting next time 226 return HasNext.RETRY; 227 } 228 } else { 229 return HasNext.YES; 230 } 231 } 232 // try open next WAL file 233 PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId); 234 Path nextPath = queue.peek(); 235 if (nextPath == null) { 236 LOG.debug("No more WAL files in queue"); 237 // no more files in queue, this could happen for recovered queue, or for a wal group of a 238 // sync replication peer which has already been transited to DA or S. 239 setCurrentPath(null); 240 return HasNext.NO; 241 } 242 setCurrentPath(nextPath); 243 // we need to test this prior to create the reader. If not, it is possible that, while 244 // opening the file, the file is still being written so its header is incomplete and we get 245 // a header EOF, but then while we test whether it is still being written, we have already 246 // flushed the data out and we consider it is not being written, and then we just skip over 247 // file, then we will lose the data written after opening... 248 boolean beingWritten = walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent(); 249 LOG.debug("Creating new reader {}, startPosition={}, beingWritten={}", nextPath, 250 currentPositionOfEntry, beingWritten); 251 try { 252 reader = WALFactory.createTailingReader(fs, nextPath, conf, 253 currentPositionOfEntry > 0 ? currentPositionOfEntry : -1); 254 return HasNext.YES; 255 } catch (WALHeaderEOFException e) { 256 if (!eofAutoRecovery) { 257 // if we do not enable EOF auto recovery, just let the upper layer retry 258 // the replication will be stuck usually, and need to be fixed manually 259 return HasNext.RETRY; 260 } 261 // we hit EOF while reading the WAL header, usually this means we can just skip over this 262 // file, but we need to be careful that whether this file is still being written, if so we 263 // should retry instead of skipping. 264 LOG.warn("EOF while trying to open WAL reader for path: {}, startPosition={}", nextPath, 265 currentPositionOfEntry, e); 266 if (beingWritten) { 267 // just retry as the file is still being written, maybe next time we could read 268 // something 269 return HasNext.RETRY; 270 } else { 271 // the file is not being written so we are safe to just skip over it 272 dequeueCurrentLog(); 273 return HasNext.RETRY_IMMEDIATELY; 274 } 275 } catch (LeaseNotRecoveredException e) { 276 // HBASE-15019 the WAL was not closed due to some hiccup. 277 LOG.warn("Try to recover the WAL lease " + nextPath, e); 278 AbstractFSWALProvider.recoverLease(conf, nextPath); 279 return HasNext.RETRY; 280 } catch (IOException | NullPointerException e) { 281 // For why we need to catch NPE here, see HDFS-4380 for more details 282 LOG.warn("Failed to open WAL reader for path: {}", nextPath, e); 283 return HasNext.RETRY; 284 } 285 } 286 287 private HasNext lastAttempt() { 288 LOG.debug("Reset reader {} for the last time to pos {}, reset compression={}", currentPath, 289 currentPositionOfEntry, state.resetCompression()); 290 try { 291 reader.resetTo(currentPositionOfEntry, state.resetCompression()); 292 } catch (IOException e) { 293 LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath, 294 currentPositionOfEntry, state.resetCompression(), e); 295 // just leave the state as is, next time we will try to reset it again, but there is a 296 // nasty problem is that, we will still reach here finally and try reset again to see if 297 // the log has been fully replicated, which is redundant, can be optimized later 298 return HasNext.RETRY; 299 } 300 Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition(); 301 state = pair.getFirst(); 302 // should not be written 303 assert !pair.getSecond(); 304 if (!state.eof()) { 305 // we still have something to read after reopen, so return YES. Or there are something wrong 306 // and we need to retry 307 return state == WALTailingReader.State.NORMAL ? HasNext.YES : HasNext.RETRY; 308 } 309 // No data available after reopen 310 if (checkAllBytesParsed()) { 311 // move to the next wal file and read 312 dequeueCurrentLog(); 313 return HasNext.RETRY_IMMEDIATELY; 314 } else { 315 // see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from 316 // beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION 317 // so when calling tryAdvanceENtry next time we will reset the reader to the beginning 318 // and read. 319 currentPositionOfEntry = 0; 320 currentPositionOfReader = 0; 321 state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION; 322 return HasNext.RETRY; 323 } 324 } 325 326 private HasNext tryAdvanceEntry() { 327 HasNext prepared = prepareReader(); 328 if (prepared != HasNext.YES) { 329 return prepared; 330 } 331 332 Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition(); 333 state = pair.getFirst(); 334 boolean beingWritten = pair.getSecond(); 335 LOG.trace("Reading WAL {}; result={}, currently open for write={}", this.currentPath, state, 336 beingWritten); 337 // The below implementation needs to make sure that when beingWritten == true, we should not 338 // dequeue the current WAL file in logQueue. 339 switch (state) { 340 case NORMAL: 341 // everything is fine, just return 342 return HasNext.YES; 343 case EOF_WITH_TRAILER: 344 // in readNextEntryAndRecordReaderPosition, we will acquire rollWriteLock, and we can only 345 // schedule a close writer task, in which we will write trailer, under the rollWriteLock, so 346 // typically if beingWritten == true, we should not reach here, as we need to reopen the 347 // reader after writing the trailer. The only possible way to reach here while beingWritten 348 // == true is due to the inflightWALClosures logic in AbstractFSWAL, as if the writer is 349 // still in this map, we will consider it as beingWritten, but actually, here we could make 350 // sure that the new WAL file has already been enqueued into the logQueue, so here dequeuing 351 // the current log file is safe. 352 if (beingWritten && logQueue.getQueue(walGroupId).size() <= 1) { 353 // As explained above, if we implement everything correctly, we should not arrive here. 354 // But anyway, even if we reach here due to some code changes in the future, reading 355 // the file again can make sure that we will not accidentally consider the queue as 356 // finished, and since there is a trailer, we will soon consider the file as finished 357 // and move on. 358 LOG.warn( 359 "We have reached the trailer while reading the file '{}' which is currently" 360 + " beingWritten, but it is the last file in log queue {}. This should not happen" 361 + " typically, try to read again so we will not miss anything", 362 currentPath, walGroupId); 363 return HasNext.RETRY; 364 } 365 assert !beingWritten || logQueue.getQueue(walGroupId).size() > 1; 366 // we have reached the trailer, which means this WAL file has been closed cleanly and we 367 // have finished reading it successfully, just move to the next WAL file and let the upper 368 // layer start reading the next WAL file 369 dequeueCurrentLog(); 370 return HasNext.RETRY_IMMEDIATELY; 371 case EOF_AND_RESET: 372 case EOF_AND_RESET_COMPRESSION: 373 if (beingWritten) { 374 // just sleep a bit and retry to see if there are new entries coming since the file is 375 // still being written 376 return HasNext.RETRY; 377 } 378 // no more entries in this log file, and the file is already closed, i.e, rolled 379 // Before dequeuing, we should always get one more attempt at reading. 380 // This is in case more entries came in after we opened the reader, and the log is rolled 381 // while we were reading. See HBASE-6758 382 return lastAttempt(); 383 case ERROR_AND_RESET: 384 case ERROR_AND_RESET_COMPRESSION: 385 // we have meet an error, just sleep a bit and retry again 386 return HasNext.RETRY; 387 default: 388 throw new IllegalArgumentException("Unknown read next result: " + state); 389 } 390 } 391 392 private FileStatus getCurrentPathFileStatus() throws IOException { 393 try { 394 return fs.getFileStatus(currentPath); 395 } catch (FileNotFoundException e) { 396 // try archived path 397 Path archivedWAL = AbstractFSWALProvider.findArchivedLog(currentPath, conf); 398 if (archivedWAL != null) { 399 return fs.getFileStatus(archivedWAL); 400 } else { 401 throw e; 402 } 403 } 404 } 405 406 // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file 407 private boolean checkAllBytesParsed() { 408 // -1 means the wal wasn't closed cleanly. 409 final long trailerSize = currentTrailerSize(); 410 FileStatus stat = null; 411 try { 412 stat = getCurrentPathFileStatus(); 413 } catch (IOException e) { 414 LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", 415 currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat(), e); 416 metrics.incrUnknownFileLengthForClosedWAL(); 417 } 418 // Here we use currentPositionOfReader instead of currentPositionOfEntry. 419 // We only call this method when currentEntry is null so usually they are the same, but there 420 // are two exceptions. One is we have nothing in the file but only a header, in this way 421 // the currentPositionOfEntry will always be 0 since we have no change to update it. The other 422 // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the 423 // last valid entry, and the currentPositionOfReader will usually point to the end of the file. 424 if (stat != null) { 425 if (trailerSize < 0) { 426 if (currentPositionOfReader < stat.getLen()) { 427 final long skippedBytes = stat.getLen() - currentPositionOfReader; 428 // See the commits in HBASE-25924/HBASE-25932 for context. 429 LOG.warn("Reached the end of WAL {}. It was not closed cleanly," 430 + " so we did not parse {} bytes of data.", currentPath, skippedBytes); 431 metrics.incrUncleanlyClosedWALs(); 432 metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); 433 } 434 } else if (currentPositionOfReader + trailerSize < stat.getLen()) { 435 LOG.warn( 436 "Processing end of WAL {} at position {}, which is too far away from" 437 + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", 438 currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat()); 439 metrics.incrRestartedWALReading(); 440 metrics.incrRepeatedFileBytes(currentPositionOfReader); 441 return false; 442 } 443 } 444 LOG.debug("Reached the end of {} and length of the file is {}", currentPath, 445 stat == null ? "N/A" : stat.getLen()); 446 metrics.incrCompletedWAL(); 447 return true; 448 } 449 450 private void dequeueCurrentLog() { 451 LOG.debug("EOF, closing {}", currentPath); 452 closeReader(); 453 logQueue.remove(walGroupId); 454 setCurrentPath(null); 455 currentPositionOfEntry = 0; 456 state = null; 457 } 458 459 /** 460 * Returns whether the file is opened for writing. 461 */ 462 private Pair<WALTailingReader.State, Boolean> readNextEntryAndRecordReaderPosition() { 463 OptionalLong fileLength; 464 if (logQueue.getQueueSize(walGroupId) > 1) { 465 // if there are more than one files in queue, although it is possible that we are 466 // still trying to write the trailer of the file and it is not closed yet, we can 467 // make sure that we will not write any WAL entries to it any more, so it is safe 468 // to just let the upper layer try to read the whole file without limit 469 fileLength = OptionalLong.empty(); 470 } else { 471 // if there is only one file in queue, check whether it is still being written to 472 // we must call this before actually reading from the reader, as this method will acquire the 473 // rollWriteLock. This is very important, as we will enqueue the new WAL file in postLogRoll, 474 // and before this happens, we could have already finished closing the previous WAL file. If 475 // we do not acquire the rollWriteLock and return whether the current file is being written 476 // to, we may finish reading the previous WAL file and start to read the next one, before it 477 // is enqueued into the logQueue, thus lead to an empty logQueue and make the shipper think 478 // the queue is already ended and quit. See HBASE-28114 and related issues for more details. 479 // in the future, if we want to optimize the logic here, for example, do not call this method 480 // every time, or do not acquire rollWriteLock in the implementation of this method, we need 481 // to carefully review the optimized implementation 482 fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); 483 } 484 WALTailingReader.Result readResult = reader.next(fileLength.orElse(-1)); 485 long readerPos = readResult.getEntryEndPos(); 486 Entry readEntry = readResult.getEntry(); 487 if (readResult.getState() == WALTailingReader.State.NORMAL) { 488 LOG.trace("reading entry: {} ", readEntry); 489 metrics.incrLogEditsRead(); 490 metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); 491 // record current entry and reader position 492 currentEntry = readResult.getEntry(); 493 this.currentPositionOfReader = readerPos; 494 } else { 495 LOG.trace("reading entry failed with: {}", readResult.getState()); 496 // set current entry to null 497 currentEntry = null; 498 try { 499 this.currentPositionOfReader = reader.getPosition(); 500 } catch (IOException e) { 501 LOG.warn("failed to get current position of reader", e); 502 if (readResult.getState().resetCompression()) { 503 return Pair.newPair(WALTailingReader.State.ERROR_AND_RESET_COMPRESSION, 504 fileLength.isPresent()); 505 } 506 } 507 } 508 return Pair.newPair(readResult.getState(), fileLength.isPresent()); 509 } 510 511 private void closeReader() { 512 if (reader != null) { 513 reader.close(); 514 reader = null; 515 } 516 } 517 518 private long currentTrailerSize() { 519 long size = -1L; 520 if (reader instanceof AbstractProtobufWALReader) { 521 final AbstractProtobufWALReader pbwr = (AbstractProtobufWALReader) reader; 522 size = pbwr.trailerSize(); 523 } 524 return size; 525 } 526}