001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.Closeable; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.util.OptionalLong; 024import java.util.concurrent.PriorityBlockingQueue; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileStatus; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader; 030import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException; 031import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 032import org.apache.hadoop.hbase.util.Pair; 033import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 034import org.apache.hadoop.hbase.wal.WAL.Entry; 035import org.apache.hadoop.hbase.wal.WALFactory; 036import org.apache.hadoop.hbase.wal.WALStreamReader; 037import org.apache.hadoop.hbase.wal.WALTailingReader; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.apache.yetus.audience.InterfaceStability; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually 045 * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it 046 * dequeues it and starts reading from the next. 047 */ 048@InterfaceAudience.Private 049@InterfaceStability.Evolving 050class WALEntryStream implements Closeable { 051 private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class); 052 053 private WALTailingReader reader; 054 private WALTailingReader.State state; 055 private Path currentPath; 056 // cache of next entry for hasNext() 057 private Entry currentEntry; 058 // position for the current entry. As now we support peek, which means that the upper layer may 059 // choose to return before reading the current entry, so it is not safe to return the value below 060 // in getPosition. 061 private long currentPositionOfEntry = 0; 062 // position after reading current entry 063 private long currentPositionOfReader = 0; 064 private final ReplicationSourceLogQueue logQueue; 065 private final String walGroupId; 066 private final FileSystem fs; 067 private final Configuration conf; 068 private final WALFileLengthProvider walFileLengthProvider; 069 private final MetricsSource metrics; 070 071 // we should be able to skip empty WAL files, but for safety, we still provide this config 072 // see HBASE-18137 for more details 073 private boolean eofAutoRecovery; 074 075 /** 076 * Create an entry stream over the given queue at the given start position 077 * @param logQueue the queue of WAL paths 078 * @param conf the {@link Configuration} to use to create {@link WALStreamReader} 079 * for this stream 080 * @param startPosition the position in the first WAL to start reading at 081 * @param walFileLengthProvider provides the length of the WAL file 082 * @param serverName the server name which all WALs belong to 083 * @param metrics the replication metrics 084 */ 085 public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf, 086 long startPosition, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics, 087 String walGroupId) { 088 this.logQueue = logQueue; 089 this.fs = fs; 090 this.conf = conf; 091 this.currentPositionOfEntry = startPosition; 092 this.walFileLengthProvider = walFileLengthProvider; 093 this.metrics = metrics; 094 this.walGroupId = walGroupId; 095 this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); 096 } 097 098 public enum HasNext { 099 /** means there is a new entry and you could use peek or next to get current entry */ 100 YES, 101 /** 102 * means there are something wrong or we have reached EOF of the current file but it is not 103 * closed yet and there is no new file in the replication queue yet, you should sleep a while 104 * and try to call hasNext again 105 */ 106 RETRY, 107 /** 108 * Usually this means we have finished reading a WAL file, and for simplify the implementation 109 * of this class, we just let the upper layer issue a new hasNext call again to open the next 110 * WAL file. 111 */ 112 RETRY_IMMEDIATELY, 113 /** 114 * means there is no new entry and stream is end, the upper layer should close this stream and 115 * release other resources as well 116 */ 117 NO 118 } 119 120 /** 121 * Try advance the stream if there is no entry yet. See the javadoc for {@link HasNext} for more 122 * details about the meanings of the return values. 123 * <p/> 124 * You can call {@link #peek()} or {@link #next()} to get the actual {@link Entry} if this method 125 * returns {@link HasNext#YES}. 126 */ 127 public HasNext hasNext() { 128 if (currentEntry == null) { 129 return tryAdvanceEntry(); 130 } else { 131 return HasNext.YES; 132 } 133 } 134 135 /** 136 * Returns the next WAL entry in this stream but does not advance. 137 * <p/> 138 * Must call {@link #hasNext()} first before calling this method, and if you have already called 139 * {@link #next()} to consume the current entry, you need to call {@link #hasNext()} again to 140 * advance the stream before calling this method again, otherwise it will always return 141 * {@code null} 142 * <p/> 143 * The reason here is that, we need to use the return value of {@link #hasNext()} to tell upper 144 * layer to retry or not, so we can not wrap the {@link #hasNext()} call inside {@link #peek()} or 145 * {@link #next()} as they have their own return value. 146 * @see #hasNext() 147 * @see #next() 148 */ 149 public Entry peek() { 150 return currentEntry; 151 } 152 153 /** 154 * Returns the next WAL entry in this stream and advance the stream. Will throw 155 * {@link IllegalStateException} if you do not call {@link #hasNext()} before calling this method. 156 * Please see the javadoc of {@link #peek()} method to see why we need this. 157 * @throws IllegalStateException Every time you want to call this method, please call 158 * {@link #hasNext()} first, otherwise a 159 * {@link IllegalStateException} will be thrown. 160 * @see #hasNext() 161 * @see #peek() 162 */ 163 public Entry next() { 164 if (currentEntry == null) { 165 throw new IllegalStateException("Call hasNext first"); 166 } 167 Entry save = peek(); 168 currentPositionOfEntry = currentPositionOfReader; 169 currentEntry = null; 170 state = null; 171 return save; 172 } 173 174 /** 175 * {@inheritDoc} 176 */ 177 @Override 178 public void close() { 179 closeReader(); 180 } 181 182 /** Returns the position of the last Entry returned by next() */ 183 public long getPosition() { 184 return currentPositionOfEntry; 185 } 186 187 /** Returns the {@link Path} of the current WAL */ 188 public Path getCurrentPath() { 189 return currentPath; 190 } 191 192 private String getCurrentPathStat() { 193 StringBuilder sb = new StringBuilder(); 194 if (currentPath != null) { 195 sb.append("currently replicating from: ").append(currentPath).append(" at position: ") 196 .append(currentPositionOfEntry).append("\n"); 197 } else { 198 sb.append("no replication ongoing, waiting for new log"); 199 } 200 return sb.toString(); 201 } 202 203 private void setCurrentPath(Path path) { 204 this.currentPath = path; 205 } 206 207 private void resetReader() throws IOException { 208 if (currentPositionOfEntry > 0) { 209 reader.resetTo(currentPositionOfEntry, state.resetCompression()); 210 } else { 211 // we will read from the beginning so we should always clear the compression context 212 reader.resetTo(-1, true); 213 } 214 } 215 216 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION", 217 justification = "HDFS-4380") 218 private HasNext prepareReader() { 219 if (reader != null) { 220 if (state != null && state != WALTailingReader.State.NORMAL) { 221 // reset before reading 222 LOG.debug("Reset reader {} to pos {}, reset compression={}", currentPath, 223 currentPositionOfEntry, state.resetCompression()); 224 try { 225 resetReader(); 226 return HasNext.YES; 227 } catch (IOException e) { 228 LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath, 229 currentPositionOfEntry, state.resetCompression(), e); 230 // just leave the state as is, and try resetting next time 231 return HasNext.RETRY; 232 } 233 } else { 234 return HasNext.YES; 235 } 236 } 237 // try open next WAL file 238 PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId); 239 Path nextPath = queue.peek(); 240 if (nextPath == null) { 241 LOG.debug("No more WAL files in queue"); 242 // no more files in queue, this could happen for recovered queue, or for a wal group of a 243 // sync replication peer which has already been transited to DA or S. 244 setCurrentPath(null); 245 return HasNext.NO; 246 } 247 setCurrentPath(nextPath); 248 // we need to test this prior to create the reader. If not, it is possible that, while 249 // opening the file, the file is still being written so its header is incomplete and we get 250 // a header EOF, but then while we test whether it is still being written, we have already 251 // flushed the data out and we consider it is not being written, and then we just skip over 252 // file, then we will lose the data written after opening... 253 boolean beingWritten = walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent(); 254 LOG.debug("Creating new reader {}, startPosition={}, beingWritten={}", nextPath, 255 currentPositionOfEntry, beingWritten); 256 try { 257 reader = WALFactory.createTailingReader(fs, nextPath, conf, 258 currentPositionOfEntry > 0 ? currentPositionOfEntry : -1); 259 return HasNext.YES; 260 } catch (WALHeaderEOFException e) { 261 if (!eofAutoRecovery) { 262 // if we do not enable EOF auto recovery, just let the upper layer retry 263 // the replication will be stuck usually, and need to be fixed manually 264 return HasNext.RETRY; 265 } 266 // we hit EOF while reading the WAL header, usually this means we can just skip over this 267 // file, but we need to be careful that whether this file is still being written, if so we 268 // should retry instead of skipping. 269 LOG.warn("EOF while trying to open WAL reader for path: {}, startPosition={}", nextPath, 270 currentPositionOfEntry, e); 271 if (beingWritten) { 272 // just retry as the file is still being written, maybe next time we could read 273 // something 274 return HasNext.RETRY; 275 } else { 276 // the file is not being written so we are safe to just skip over it 277 dequeueCurrentLog(); 278 return HasNext.RETRY_IMMEDIATELY; 279 } 280 } catch (LeaseNotRecoveredException e) { 281 // HBASE-15019 the WAL was not closed due to some hiccup. 282 LOG.warn("Try to recover the WAL lease " + nextPath, e); 283 AbstractFSWALProvider.recoverLease(conf, nextPath); 284 return HasNext.RETRY; 285 } catch (IOException | NullPointerException e) { 286 // For why we need to catch NPE here, see HDFS-4380 for more details 287 LOG.warn("Failed to open WAL reader for path: {}", nextPath, e); 288 return HasNext.RETRY; 289 } 290 } 291 292 private HasNext lastAttempt() { 293 LOG.debug("Reset reader {} for the last time to pos {}, reset compression={}", currentPath, 294 currentPositionOfEntry, state.resetCompression()); 295 try { 296 resetReader(); 297 } catch (IOException e) { 298 LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath, 299 currentPositionOfEntry, state.resetCompression(), e); 300 // just leave the state as is, next time we will try to reset it again, but there is a 301 // nasty problem is that, we will still reach here finally and try reset again to see if 302 // the log has been fully replicated, which is redundant, can be optimized later 303 return HasNext.RETRY; 304 } 305 Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition(); 306 state = pair.getFirst(); 307 // should not be written 308 assert !pair.getSecond(); 309 if (!state.eof()) { 310 // we still have something to read after reopen, so return YES. Or there are something wrong 311 // and we need to retry 312 return state == WALTailingReader.State.NORMAL ? HasNext.YES : HasNext.RETRY; 313 } 314 // No data available after reopen 315 if (checkAllBytesParsed()) { 316 // move to the next wal file and read 317 dequeueCurrentLog(); 318 return HasNext.RETRY_IMMEDIATELY; 319 } else { 320 // see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from 321 // beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION 322 // so when calling tryAdvanceENtry next time we will reset the reader to the beginning 323 // and read. 324 currentPositionOfEntry = 0; 325 currentPositionOfReader = 0; 326 state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION; 327 return HasNext.RETRY; 328 } 329 } 330 331 private HasNext tryAdvanceEntry() { 332 HasNext prepared = prepareReader(); 333 if (prepared != HasNext.YES) { 334 return prepared; 335 } 336 337 Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition(); 338 state = pair.getFirst(); 339 boolean beingWritten = pair.getSecond(); 340 LOG.trace("Reading WAL {}; result={}, currently open for write={}", this.currentPath, state, 341 beingWritten); 342 // The below implementation needs to make sure that when beingWritten == true, we should not 343 // dequeue the current WAL file in logQueue. 344 switch (state) { 345 case NORMAL: 346 // everything is fine, just return 347 return HasNext.YES; 348 case EOF_WITH_TRAILER: 349 // in readNextEntryAndRecordReaderPosition, we will acquire rollWriteLock, and we can only 350 // schedule a close writer task, in which we will write trailer, under the rollWriteLock, so 351 // typically if beingWritten == true, we should not reach here, as we need to reopen the 352 // reader after writing the trailer. The only possible way to reach here while beingWritten 353 // == true is due to the inflightWALClosures logic in AbstractFSWAL, as if the writer is 354 // still in this map, we will consider it as beingWritten, but actually, here we could make 355 // sure that the new WAL file has already been enqueued into the logQueue, so here dequeuing 356 // the current log file is safe. 357 if (beingWritten && logQueue.getQueue(walGroupId).size() <= 1) { 358 // As explained above, if we implement everything correctly, we should not arrive here. 359 // But anyway, even if we reach here due to some code changes in the future, reading 360 // the file again can make sure that we will not accidentally consider the queue as 361 // finished, and since there is a trailer, we will soon consider the file as finished 362 // and move on. 363 LOG.warn( 364 "We have reached the trailer while reading the file '{}' which is currently" 365 + " beingWritten, but it is the last file in log queue {}. This should not happen" 366 + " typically, try to read again so we will not miss anything", 367 currentPath, walGroupId); 368 return HasNext.RETRY; 369 } 370 assert !beingWritten || logQueue.getQueue(walGroupId).size() > 1; 371 // we have reached the trailer, which means this WAL file has been closed cleanly and we 372 // have finished reading it successfully, just move to the next WAL file and let the upper 373 // layer start reading the next WAL file 374 dequeueCurrentLog(); 375 return HasNext.RETRY_IMMEDIATELY; 376 case EOF_AND_RESET: 377 case EOF_AND_RESET_COMPRESSION: 378 if (beingWritten) { 379 // just sleep a bit and retry to see if there are new entries coming since the file is 380 // still being written 381 return HasNext.RETRY; 382 } 383 // no more entries in this log file, and the file is already closed, i.e, rolled 384 // Before dequeuing, we should always get one more attempt at reading. 385 // This is in case more entries came in after we opened the reader, and the log is rolled 386 // while we were reading. See HBASE-6758 387 return lastAttempt(); 388 case ERROR_AND_RESET: 389 case ERROR_AND_RESET_COMPRESSION: 390 // we have meet an error, just sleep a bit and retry again 391 return HasNext.RETRY; 392 default: 393 throw new IllegalArgumentException("Unknown read next result: " + state); 394 } 395 } 396 397 private FileStatus getCurrentPathFileStatus() throws IOException { 398 try { 399 return fs.getFileStatus(currentPath); 400 } catch (FileNotFoundException e) { 401 // try archived path 402 Path archivedWAL = AbstractFSWALProvider.findArchivedLog(currentPath, conf); 403 if (archivedWAL != null) { 404 return fs.getFileStatus(archivedWAL); 405 } else { 406 throw e; 407 } 408 } 409 } 410 411 // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file 412 private boolean checkAllBytesParsed() { 413 // -1 means the wal wasn't closed cleanly. 414 final long trailerSize = currentTrailerSize(); 415 FileStatus stat = null; 416 try { 417 stat = getCurrentPathFileStatus(); 418 } catch (IOException e) { 419 LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", 420 currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat(), e); 421 metrics.incrUnknownFileLengthForClosedWAL(); 422 } 423 // Here we use currentPositionOfReader instead of currentPositionOfEntry. 424 // We only call this method when currentEntry is null so usually they are the same, but there 425 // are two exceptions. One is we have nothing in the file but only a header, in this way 426 // the currentPositionOfEntry will always be 0 since we have no change to update it. The other 427 // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the 428 // last valid entry, and the currentPositionOfReader will usually point to the end of the file. 429 if (stat != null) { 430 if (trailerSize < 0) { 431 if (currentPositionOfReader < stat.getLen()) { 432 final long skippedBytes = stat.getLen() - currentPositionOfReader; 433 // See the commits in HBASE-25924/HBASE-25932 for context. 434 LOG.warn("Reached the end of WAL {}. It was not closed cleanly," 435 + " so we did not parse {} bytes of data.", currentPath, skippedBytes); 436 metrics.incrUncleanlyClosedWALs(); 437 metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); 438 } 439 } else if (currentPositionOfReader + trailerSize < stat.getLen()) { 440 LOG.warn( 441 "Processing end of WAL {} at position {}, which is too far away from" 442 + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", 443 currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat()); 444 metrics.incrRestartedWALReading(); 445 metrics.incrRepeatedFileBytes(currentPositionOfReader); 446 return false; 447 } 448 } 449 LOG.debug("Reached the end of {} and length of the file is {}", currentPath, 450 stat == null ? "N/A" : stat.getLen()); 451 metrics.incrCompletedWAL(); 452 return true; 453 } 454 455 private void dequeueCurrentLog() { 456 LOG.debug("EOF, closing {}", currentPath); 457 closeReader(); 458 logQueue.remove(walGroupId); 459 setCurrentPath(null); 460 currentPositionOfEntry = 0; 461 state = null; 462 } 463 464 /** 465 * Returns whether the file is opened for writing. 466 */ 467 private Pair<WALTailingReader.State, Boolean> readNextEntryAndRecordReaderPosition() { 468 OptionalLong fileLength; 469 if (logQueue.getQueueSize(walGroupId) > 1) { 470 // if there are more than one files in queue, although it is possible that we are 471 // still trying to write the trailer of the file and it is not closed yet, we can 472 // make sure that we will not write any WAL entries to it any more, so it is safe 473 // to just let the upper layer try to read the whole file without limit 474 fileLength = OptionalLong.empty(); 475 } else { 476 // if there is only one file in queue, check whether it is still being written to 477 // we must call this before actually reading from the reader, as this method will acquire the 478 // rollWriteLock. This is very important, as we will enqueue the new WAL file in postLogRoll, 479 // and before this happens, we could have already finished closing the previous WAL file. If 480 // we do not acquire the rollWriteLock and return whether the current file is being written 481 // to, we may finish reading the previous WAL file and start to read the next one, before it 482 // is enqueued into the logQueue, thus lead to an empty logQueue and make the shipper think 483 // the queue is already ended and quit. See HBASE-28114 and related issues for more details. 484 // in the future, if we want to optimize the logic here, for example, do not call this method 485 // every time, or do not acquire rollWriteLock in the implementation of this method, we need 486 // to carefully review the optimized implementation 487 fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); 488 } 489 WALTailingReader.Result readResult = reader.next(fileLength.orElse(-1)); 490 long readerPos = readResult.getEntryEndPos(); 491 Entry readEntry = readResult.getEntry(); 492 if (readResult.getState() == WALTailingReader.State.NORMAL) { 493 LOG.trace("reading entry: {} ", readEntry); 494 metrics.incrLogEditsRead(); 495 metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); 496 // record current entry and reader position 497 currentEntry = readResult.getEntry(); 498 this.currentPositionOfReader = readerPos; 499 } else { 500 LOG.trace("reading entry failed with: {}", readResult.getState()); 501 // set current entry to null 502 currentEntry = null; 503 try { 504 this.currentPositionOfReader = reader.getPosition(); 505 } catch (IOException e) { 506 LOG.warn("failed to get current position of reader", e); 507 if (readResult.getState().resetCompression()) { 508 return Pair.newPair(WALTailingReader.State.ERROR_AND_RESET_COMPRESSION, 509 fileLength.isPresent()); 510 } 511 } 512 } 513 return Pair.newPair(readResult.getState(), fileLength.isPresent()); 514 } 515 516 private void closeReader() { 517 if (reader != null) { 518 reader.close(); 519 reader = null; 520 } 521 } 522 523 private long currentTrailerSize() { 524 long size = -1L; 525 if (reader instanceof AbstractProtobufWALReader) { 526 final AbstractProtobufWALReader pbwr = (AbstractProtobufWALReader) reader; 527 size = pbwr.trailerSize(); 528 } 529 return size; 530 } 531}