001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.util.OptionalLong;
024import java.util.concurrent.PriorityBlockingQueue;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileStatus;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
030import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException;
031import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
032import org.apache.hadoop.hbase.util.Pair;
033import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
034import org.apache.hadoop.hbase.wal.WAL.Entry;
035import org.apache.hadoop.hbase.wal.WALFactory;
036import org.apache.hadoop.hbase.wal.WALStreamReader;
037import org.apache.hadoop.hbase.wal.WALTailingReader;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.apache.yetus.audience.InterfaceStability;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
045 * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
046 * dequeues it and starts reading from the next.
047 */
048@InterfaceAudience.Private
049@InterfaceStability.Evolving
050class WALEntryStream implements Closeable {
051  private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class);
052
053  private WALTailingReader reader;
054  private WALTailingReader.State state;
055  private Path currentPath;
056  // cache of next entry for hasNext()
057  private Entry currentEntry;
058  // position for the current entry. As now we support peek, which means that the upper layer may
059  // choose to return before reading the current entry, so it is not safe to return the value below
060  // in getPosition.
061  private long currentPositionOfEntry = 0;
062  // position after reading current entry
063  private long currentPositionOfReader = 0;
064  private final ReplicationSourceLogQueue logQueue;
065  private final String walGroupId;
066  private final FileSystem fs;
067  private final Configuration conf;
068  private final WALFileLengthProvider walFileLengthProvider;
069  private final MetricsSource metrics;
070
071  // we should be able to skip empty WAL files, but for safety, we still provide this config
072  // see HBASE-18137 for more details
073  private boolean eofAutoRecovery;
074
075  /**
076   * Create an entry stream over the given queue at the given start position
077   * @param logQueue              the queue of WAL paths
078   * @param conf                  the {@link Configuration} to use to create {@link WALStreamReader}
079   *                              for this stream
080   * @param startPosition         the position in the first WAL to start reading at
081   * @param walFileLengthProvider provides the length of the WAL file
082   * @param serverName            the server name which all WALs belong to
083   * @param metrics               the replication metrics
084   */
085  public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
086    long startPosition, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics,
087    String walGroupId) {
088    this.logQueue = logQueue;
089    this.fs = fs;
090    this.conf = conf;
091    this.currentPositionOfEntry = startPosition;
092    this.walFileLengthProvider = walFileLengthProvider;
093    this.metrics = metrics;
094    this.walGroupId = walGroupId;
095    this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
096  }
097
098  public enum HasNext {
099    /** means there is a new entry and you could use peek or next to get current entry */
100    YES,
101    /**
102     * means there are something wrong or we have reached EOF of the current file but it is not
103     * closed yet and there is no new file in the replication queue yet, you should sleep a while
104     * and try to call hasNext again
105     */
106    RETRY,
107    /**
108     * Usually this means we have finished reading a WAL file, and for simplify the implementation
109     * of this class, we just let the upper layer issue a new hasNext call again to open the next
110     * WAL file.
111     */
112    RETRY_IMMEDIATELY,
113    /**
114     * means there is no new entry and stream is end, the upper layer should close this stream and
115     * release other resources as well
116     */
117    NO
118  }
119
120  /**
121   * Try advance the stream if there is no entry yet. See the javadoc for {@link HasNext} for more
122   * details about the meanings of the return values.
123   * <p/>
124   * You can call {@link #peek()} or {@link #next()} to get the actual {@link Entry} if this method
125   * returns {@link HasNext#YES}.
126   */
127  public HasNext hasNext() {
128    if (currentEntry == null) {
129      return tryAdvanceEntry();
130    } else {
131      return HasNext.YES;
132    }
133  }
134
135  /**
136   * Returns the next WAL entry in this stream but does not advance.
137   * <p/>
138   * Must call {@link #hasNext()} first before calling this method, and if you have already called
139   * {@link #next()} to consume the current entry, you need to call {@link #hasNext()} again to
140   * advance the stream before calling this method again, otherwise it will always return
141   * {@code null}
142   * <p/>
143   * The reason here is that, we need to use the return value of {@link #hasNext()} to tell upper
144   * layer to retry or not, so we can not wrap the {@link #hasNext()} call inside {@link #peek()} or
145   * {@link #next()} as they have their own return value.
146   * @see #hasNext()
147   * @see #next()
148   */
149  public Entry peek() {
150    return currentEntry;
151  }
152
153  /**
154   * Returns the next WAL entry in this stream and advance the stream. Will throw
155   * {@link IllegalStateException} if you do not call {@link #hasNext()} before calling this method.
156   * Please see the javadoc of {@link #peek()} method to see why we need this.
157   * @throws IllegalStateException Every time you want to call this method, please call
158   *                               {@link #hasNext()} first, otherwise a
159   *                               {@link IllegalStateException} will be thrown.
160   * @see #hasNext()
161   * @see #peek()
162   */
163  public Entry next() {
164    if (currentEntry == null) {
165      throw new IllegalStateException("Call hasNext first");
166    }
167    Entry save = peek();
168    currentPositionOfEntry = currentPositionOfReader;
169    currentEntry = null;
170    state = null;
171    return save;
172  }
173
174  /**
175   * {@inheritDoc}
176   */
177  @Override
178  public void close() {
179    closeReader();
180  }
181
182  /** Returns the position of the last Entry returned by next() */
183  public long getPosition() {
184    return currentPositionOfEntry;
185  }
186
187  /** Returns the {@link Path} of the current WAL */
188  public Path getCurrentPath() {
189    return currentPath;
190  }
191
192  private String getCurrentPathStat() {
193    StringBuilder sb = new StringBuilder();
194    if (currentPath != null) {
195      sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
196        .append(currentPositionOfEntry).append("\n");
197    } else {
198      sb.append("no replication ongoing, waiting for new log");
199    }
200    return sb.toString();
201  }
202
203  private void setCurrentPath(Path path) {
204    this.currentPath = path;
205  }
206
207  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
208      justification = "HDFS-4380")
209  private HasNext prepareReader() {
210    if (reader != null) {
211      if (state != null && state != WALTailingReader.State.NORMAL) {
212        // reset before reading
213        LOG.debug("Reset reader {} to pos {}, reset compression={}", currentPath,
214          currentPositionOfEntry, state.resetCompression());
215        try {
216          if (currentPositionOfEntry > 0) {
217            reader.resetTo(currentPositionOfEntry, state.resetCompression());
218          } else {
219            // we will read from the beginning so we should always clear the compression context
220            reader.resetTo(-1, true);
221          }
222        } catch (IOException e) {
223          LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
224            currentPositionOfEntry, state.resetCompression(), e);
225          // just leave the state as is, and try resetting next time
226          return HasNext.RETRY;
227        }
228      } else {
229        return HasNext.YES;
230      }
231    }
232    // try open next WAL file
233    PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
234    Path nextPath = queue.peek();
235    if (nextPath == null) {
236      LOG.debug("No more WAL files in queue");
237      // no more files in queue, this could happen for recovered queue, or for a wal group of a
238      // sync replication peer which has already been transited to DA or S.
239      setCurrentPath(null);
240      return HasNext.NO;
241    }
242    setCurrentPath(nextPath);
243    // we need to test this prior to create the reader. If not, it is possible that, while
244    // opening the file, the file is still being written so its header is incomplete and we get
245    // a header EOF, but then while we test whether it is still being written, we have already
246    // flushed the data out and we consider it is not being written, and then we just skip over
247    // file, then we will lose the data written after opening...
248    boolean beingWritten = walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent();
249    LOG.debug("Creating new reader {}, startPosition={}, beingWritten={}", nextPath,
250      currentPositionOfEntry, beingWritten);
251    try {
252      reader = WALFactory.createTailingReader(fs, nextPath, conf,
253        currentPositionOfEntry > 0 ? currentPositionOfEntry : -1);
254      return HasNext.YES;
255    } catch (WALHeaderEOFException e) {
256      if (!eofAutoRecovery) {
257        // if we do not enable EOF auto recovery, just let the upper layer retry
258        // the replication will be stuck usually, and need to be fixed manually
259        return HasNext.RETRY;
260      }
261      // we hit EOF while reading the WAL header, usually this means we can just skip over this
262      // file, but we need to be careful that whether this file is still being written, if so we
263      // should retry instead of skipping.
264      LOG.warn("EOF while trying to open WAL reader for path: {}, startPosition={}", nextPath,
265        currentPositionOfEntry, e);
266      if (beingWritten) {
267        // just retry as the file is still being written, maybe next time we could read
268        // something
269        return HasNext.RETRY;
270      } else {
271        // the file is not being written so we are safe to just skip over it
272        dequeueCurrentLog();
273        return HasNext.RETRY_IMMEDIATELY;
274      }
275    } catch (LeaseNotRecoveredException e) {
276      // HBASE-15019 the WAL was not closed due to some hiccup.
277      LOG.warn("Try to recover the WAL lease " + nextPath, e);
278      AbstractFSWALProvider.recoverLease(conf, nextPath);
279      return HasNext.RETRY;
280    } catch (IOException | NullPointerException e) {
281      // For why we need to catch NPE here, see HDFS-4380 for more details
282      LOG.warn("Failed to open WAL reader for path: {}", nextPath, e);
283      return HasNext.RETRY;
284    }
285  }
286
287  private HasNext lastAttempt() {
288    LOG.debug("Reset reader {} for the last time to pos {}, reset compression={}", currentPath,
289      currentPositionOfEntry, state.resetCompression());
290    try {
291      reader.resetTo(currentPositionOfEntry, state.resetCompression());
292    } catch (IOException e) {
293      LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
294        currentPositionOfEntry, state.resetCompression(), e);
295      // just leave the state as is, next time we will try to reset it again, but there is a
296      // nasty problem is that, we will still reach here finally and try reset again to see if
297      // the log has been fully replicated, which is redundant, can be optimized later
298      return HasNext.RETRY;
299    }
300    Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
301    state = pair.getFirst();
302    // should not be written
303    assert !pair.getSecond();
304    if (!state.eof()) {
305      // we still have something to read after reopen, so return YES. Or there are something wrong
306      // and we need to retry
307      return state == WALTailingReader.State.NORMAL ? HasNext.YES : HasNext.RETRY;
308    }
309    // No data available after reopen
310    if (checkAllBytesParsed()) {
311      // move to the next wal file and read
312      dequeueCurrentLog();
313      return HasNext.RETRY_IMMEDIATELY;
314    } else {
315      // see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from
316      // beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION
317      // so when calling tryAdvanceENtry next time we will reset the reader to the beginning
318      // and read.
319      currentPositionOfEntry = 0;
320      currentPositionOfReader = 0;
321      state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION;
322      return HasNext.RETRY;
323    }
324  }
325
326  private HasNext tryAdvanceEntry() {
327    HasNext prepared = prepareReader();
328    if (prepared != HasNext.YES) {
329      return prepared;
330    }
331
332    Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
333    state = pair.getFirst();
334    boolean beingWritten = pair.getSecond();
335    LOG.trace("Reading WAL {}; result={}, currently open for write={}", this.currentPath, state,
336      beingWritten);
337    // The below implementation needs to make sure that when beingWritten == true, we should not
338    // dequeue the current WAL file in logQueue.
339    switch (state) {
340      case NORMAL:
341        // everything is fine, just return
342        return HasNext.YES;
343      case EOF_WITH_TRAILER:
344        // in readNextEntryAndRecordReaderPosition, we will acquire rollWriteLock, and we can only
345        // schedule a close writer task, in which we will write trailer, under the rollWriteLock, so
346        // typically if beingWritten == true, we should not reach here, as we need to reopen the
347        // reader after writing the trailer. The only possible way to reach here while beingWritten
348        // == true is due to the inflightWALClosures logic in AbstractFSWAL, as if the writer is
349        // still in this map, we will consider it as beingWritten, but actually, here we could make
350        // sure that the new WAL file has already been enqueued into the logQueue, so here dequeuing
351        // the current log file is safe.
352        if (beingWritten && logQueue.getQueue(walGroupId).size() <= 1) {
353          // As explained above, if we implement everything correctly, we should not arrive here.
354          // But anyway, even if we reach here due to some code changes in the future, reading
355          // the file again can make sure that we will not accidentally consider the queue as
356          // finished, and since there is a trailer, we will soon consider the file as finished
357          // and move on.
358          LOG.warn(
359            "We have reached the trailer while reading the file '{}' which is currently"
360              + " beingWritten, but it is the last file in log queue {}. This should not happen"
361              + " typically, try to read again so we will not miss anything",
362            currentPath, walGroupId);
363          return HasNext.RETRY;
364        }
365        assert !beingWritten || logQueue.getQueue(walGroupId).size() > 1;
366        // we have reached the trailer, which means this WAL file has been closed cleanly and we
367        // have finished reading it successfully, just move to the next WAL file and let the upper
368        // layer start reading the next WAL file
369        dequeueCurrentLog();
370        return HasNext.RETRY_IMMEDIATELY;
371      case EOF_AND_RESET:
372      case EOF_AND_RESET_COMPRESSION:
373        if (beingWritten) {
374          // just sleep a bit and retry to see if there are new entries coming since the file is
375          // still being written
376          return HasNext.RETRY;
377        }
378        // no more entries in this log file, and the file is already closed, i.e, rolled
379        // Before dequeuing, we should always get one more attempt at reading.
380        // This is in case more entries came in after we opened the reader, and the log is rolled
381        // while we were reading. See HBASE-6758
382        return lastAttempt();
383      case ERROR_AND_RESET:
384      case ERROR_AND_RESET_COMPRESSION:
385        // we have meet an error, just sleep a bit and retry again
386        return HasNext.RETRY;
387      default:
388        throw new IllegalArgumentException("Unknown read next result: " + state);
389    }
390  }
391
392  private FileStatus getCurrentPathFileStatus() throws IOException {
393    try {
394      return fs.getFileStatus(currentPath);
395    } catch (FileNotFoundException e) {
396      // try archived path
397      Path archivedWAL = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
398      if (archivedWAL != null) {
399        return fs.getFileStatus(archivedWAL);
400      } else {
401        throw e;
402      }
403    }
404  }
405
406  // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
407  private boolean checkAllBytesParsed() {
408    // -1 means the wal wasn't closed cleanly.
409    final long trailerSize = currentTrailerSize();
410    FileStatus stat = null;
411    try {
412      stat = getCurrentPathFileStatus();
413    } catch (IOException e) {
414      LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}",
415        currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat(), e);
416      metrics.incrUnknownFileLengthForClosedWAL();
417    }
418    // Here we use currentPositionOfReader instead of currentPositionOfEntry.
419    // We only call this method when currentEntry is null so usually they are the same, but there
420    // are two exceptions. One is we have nothing in the file but only a header, in this way
421    // the currentPositionOfEntry will always be 0 since we have no change to update it. The other
422    // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the
423    // last valid entry, and the currentPositionOfReader will usually point to the end of the file.
424    if (stat != null) {
425      if (trailerSize < 0) {
426        if (currentPositionOfReader < stat.getLen()) {
427          final long skippedBytes = stat.getLen() - currentPositionOfReader;
428          // See the commits in HBASE-25924/HBASE-25932 for context.
429          LOG.warn("Reached the end of WAL {}. It was not closed cleanly,"
430            + " so we did not parse {} bytes of data.", currentPath, skippedBytes);
431          metrics.incrUncleanlyClosedWALs();
432          metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
433        }
434      } else if (currentPositionOfReader + trailerSize < stat.getLen()) {
435        LOG.warn(
436          "Processing end of WAL {} at position {}, which is too far away from"
437            + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
438          currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
439        metrics.incrRestartedWALReading();
440        metrics.incrRepeatedFileBytes(currentPositionOfReader);
441        return false;
442      }
443    }
444    LOG.debug("Reached the end of {} and length of the file is {}", currentPath,
445      stat == null ? "N/A" : stat.getLen());
446    metrics.incrCompletedWAL();
447    return true;
448  }
449
450  private void dequeueCurrentLog() {
451    LOG.debug("EOF, closing {}", currentPath);
452    closeReader();
453    logQueue.remove(walGroupId);
454    setCurrentPath(null);
455    currentPositionOfEntry = 0;
456    state = null;
457  }
458
459  /**
460   * Returns whether the file is opened for writing.
461   */
462  private Pair<WALTailingReader.State, Boolean> readNextEntryAndRecordReaderPosition() {
463    OptionalLong fileLength;
464    if (logQueue.getQueueSize(walGroupId) > 1) {
465      // if there are more than one files in queue, although it is possible that we are
466      // still trying to write the trailer of the file and it is not closed yet, we can
467      // make sure that we will not write any WAL entries to it any more, so it is safe
468      // to just let the upper layer try to read the whole file without limit
469      fileLength = OptionalLong.empty();
470    } else {
471      // if there is only one file in queue, check whether it is still being written to
472      // we must call this before actually reading from the reader, as this method will acquire the
473      // rollWriteLock. This is very important, as we will enqueue the new WAL file in postLogRoll,
474      // and before this happens, we could have already finished closing the previous WAL file. If
475      // we do not acquire the rollWriteLock and return whether the current file is being written
476      // to, we may finish reading the previous WAL file and start to read the next one, before it
477      // is enqueued into the logQueue, thus lead to an empty logQueue and make the shipper think
478      // the queue is already ended and quit. See HBASE-28114 and related issues for more details.
479      // in the future, if we want to optimize the logic here, for example, do not call this method
480      // every time, or do not acquire rollWriteLock in the implementation of this method, we need
481      // to carefully review the optimized implementation
482      fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
483    }
484    WALTailingReader.Result readResult = reader.next(fileLength.orElse(-1));
485    long readerPos = readResult.getEntryEndPos();
486    Entry readEntry = readResult.getEntry();
487    if (readResult.getState() == WALTailingReader.State.NORMAL) {
488      LOG.trace("reading entry: {} ", readEntry);
489      metrics.incrLogEditsRead();
490      metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
491      // record current entry and reader position
492      currentEntry = readResult.getEntry();
493      this.currentPositionOfReader = readerPos;
494    } else {
495      LOG.trace("reading entry failed with: {}", readResult.getState());
496      // set current entry to null
497      currentEntry = null;
498      try {
499        this.currentPositionOfReader = reader.getPosition();
500      } catch (IOException e) {
501        LOG.warn("failed to get current position of reader", e);
502        if (readResult.getState().resetCompression()) {
503          return Pair.newPair(WALTailingReader.State.ERROR_AND_RESET_COMPRESSION,
504            fileLength.isPresent());
505        }
506      }
507    }
508    return Pair.newPair(readResult.getState(), fileLength.isPresent());
509  }
510
511  private void closeReader() {
512    if (reader != null) {
513      reader.close();
514      reader = null;
515    }
516  }
517
518  private long currentTrailerSize() {
519    long size = -1L;
520    if (reader instanceof AbstractProtobufWALReader) {
521      final AbstractProtobufWALReader pbwr = (AbstractProtobufWALReader) reader;
522      size = pbwr.trailerSize();
523    }
524    return size;
525  }
526}