001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.util.OptionalLong;
024import java.util.concurrent.PriorityBlockingQueue;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileStatus;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
031import org.apache.hadoop.hbase.util.CancelableProgressable;
032import org.apache.hadoop.hbase.util.CommonFSUtils;
033import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
034import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
035import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
036import org.apache.hadoop.hbase.wal.WAL.Entry;
037import org.apache.hadoop.hbase.wal.WAL.Reader;
038import org.apache.hadoop.hbase.wal.WALFactory;
039import org.apache.hadoop.ipc.RemoteException;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.apache.yetus.audience.InterfaceStability;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
047 * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
048 * dequeues it and starts reading from the next.
049 */
050@InterfaceAudience.Private
051@InterfaceStability.Evolving
052class WALEntryStream implements Closeable {
053  private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class);
054
055  private Reader reader;
056  private Path currentPath;
057  // cache of next entry for hasNext()
058  private Entry currentEntry;
059  // position for the current entry. As now we support peek, which means that the upper layer may
060  // choose to return before reading the current entry, so it is not safe to return the value below
061  // in getPosition.
062  private long currentPositionOfEntry = 0;
063  // position after reading current entry
064  private long currentPositionOfReader = 0;
065  private final ReplicationSourceLogQueue logQueue;
066  private final String walGroupId;
067  private final FileSystem fs;
068  private final Configuration conf;
069  private final WALFileLengthProvider walFileLengthProvider;
070  // which region server the WALs belong to
071  private final ServerName serverName;
072  private final MetricsSource metrics;
073
074  /**
075   * Create an entry stream over the given queue at the given start position
076   * @param logQueue              the queue of WAL paths
077   * @param conf                  the {@link Configuration} to use to create {@link Reader} for this
078   *                              stream
079   * @param startPosition         the position in the first WAL to start reading at
080   * @param walFileLengthProvider provides the length of the WAL file
081   * @param serverName            the server name which all WALs belong to
082   * @param metrics               the replication metrics
083   * @throws IOException throw IO exception from stream
084   */
085  public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf, long startPosition,
086    WALFileLengthProvider walFileLengthProvider, ServerName serverName, MetricsSource metrics,
087    String walGroupId) throws IOException {
088    this.logQueue = logQueue;
089    this.fs = CommonFSUtils.getWALFileSystem(conf);
090    this.conf = conf;
091    this.currentPositionOfEntry = startPosition;
092    this.walFileLengthProvider = walFileLengthProvider;
093    this.serverName = serverName;
094    this.metrics = metrics;
095    this.walGroupId = walGroupId;
096  }
097
098  /** Returns true if there is another WAL {@link Entry} */
099  public boolean hasNext() throws IOException {
100    if (currentEntry == null) {
101      tryAdvanceEntry();
102    }
103    return currentEntry != null;
104  }
105
106  /**
107   * Returns the next WAL entry in this stream but does not advance.
108   */
109  public Entry peek() throws IOException {
110    return hasNext() ? currentEntry : null;
111  }
112
113  /**
114   * Returns the next WAL entry in this stream and advance the stream.
115   */
116  public Entry next() throws IOException {
117    Entry save = peek();
118    currentPositionOfEntry = currentPositionOfReader;
119    currentEntry = null;
120    return save;
121  }
122
123  /**
124   * {@inheritDoc}
125   */
126  @Override
127  public void close() throws IOException {
128    closeReader();
129  }
130
131  /** Returns the position of the last Entry returned by next() */
132  public long getPosition() {
133    return currentPositionOfEntry;
134  }
135
136  /** Returns the {@link Path} of the current WAL */
137  public Path getCurrentPath() {
138    return currentPath;
139  }
140
141  private String getCurrentPathStat() {
142    StringBuilder sb = new StringBuilder();
143    if (currentPath != null) {
144      sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
145        .append(currentPositionOfEntry).append("\n");
146    } else {
147      sb.append("no replication ongoing, waiting for new log");
148    }
149    return sb.toString();
150  }
151
152  /**
153   * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
154   * false)
155   */
156  public void reset() throws IOException {
157    if (reader != null && currentPath != null) {
158      resetReader();
159    }
160  }
161
162  private void setPosition(long position) {
163    currentPositionOfEntry = position;
164  }
165
166  private void setCurrentPath(Path path) {
167    this.currentPath = path;
168  }
169
170  private void tryAdvanceEntry() throws IOException {
171    if (checkReader()) {
172      boolean beingWritten = readNextEntryAndRecordReaderPosition();
173      LOG.trace("Reading WAL {}; currently open for write={}", this.currentPath, beingWritten);
174      if (currentEntry == null && !beingWritten) {
175        // no more entries in this log file, and the file is already closed, i.e, rolled
176        // Before dequeueing, we should always get one more attempt at reading.
177        // This is in case more entries came in after we opened the reader, and the log is rolled
178        // while we were reading. See HBASE-6758
179        resetReader();
180        readNextEntryAndRecordReaderPosition();
181        if (currentEntry == null) {
182          if (checkAllBytesParsed()) { // now we're certain we're done with this log file
183            dequeueCurrentLog();
184            if (openNextLog()) {
185              readNextEntryAndRecordReaderPosition();
186            }
187          }
188        }
189      }
190      // if currentEntry != null then just return
191      // if currentEntry == null but the file is still being written, then we should not switch to
192      // the next log either, just return here and try next time to see if there are more entries in
193      // the current file
194    }
195    // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
196  }
197
198  // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
199  private boolean checkAllBytesParsed() throws IOException {
200    // -1 means the wal wasn't closed cleanly.
201    final long trailerSize = currentTrailerSize();
202    FileStatus stat = null;
203    try {
204      stat = fs.getFileStatus(this.currentPath);
205    } catch (IOException exception) {
206      LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}",
207        currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat());
208      metrics.incrUnknownFileLengthForClosedWAL();
209    }
210    // Here we use currentPositionOfReader instead of currentPositionOfEntry.
211    // We only call this method when currentEntry is null so usually they are the same, but there
212    // are two exceptions. One is we have nothing in the file but only a header, in this way
213    // the currentPositionOfEntry will always be 0 since we have no change to update it. The other
214    // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the
215    // last valid entry, and the currentPositionOfReader will usually point to the end of the file.
216    if (stat != null) {
217      if (trailerSize < 0) {
218        if (currentPositionOfReader < stat.getLen()) {
219          final long skippedBytes = stat.getLen() - currentPositionOfReader;
220          // See the commits in HBASE-25924/HBASE-25932 for context.
221          LOG.warn("Reached the end of WAL {}. It was not closed cleanly,"
222            + " so we did not parse {} bytes of data.", currentPath, skippedBytes);
223          metrics.incrUncleanlyClosedWALs();
224          metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
225        }
226      } else if (currentPositionOfReader + trailerSize < stat.getLen()) {
227        LOG.warn(
228          "Processing end of WAL {} at position {}, which is too far away from"
229            + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
230          currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
231        setPosition(0);
232        resetReader();
233        metrics.incrRestartedWALReading();
234        metrics.incrRepeatedFileBytes(currentPositionOfReader);
235        return false;
236      }
237    }
238    if (LOG.isTraceEnabled()) {
239      LOG.trace("Reached the end of " + this.currentPath + " and length of the file is "
240        + (stat == null ? "N/A" : stat.getLen()));
241    }
242    metrics.incrCompletedWAL();
243    return true;
244  }
245
246  private void dequeueCurrentLog() throws IOException {
247    LOG.debug("EOF, closing {}", currentPath);
248    closeReader();
249    logQueue.remove(walGroupId);
250    setCurrentPath(null);
251    setPosition(0);
252  }
253
254  /**
255   * Returns whether the file is opened for writing.
256   */
257  private boolean readNextEntryAndRecordReaderPosition() throws IOException {
258    Entry readEntry = reader.next();
259    long readerPos = reader.getPosition();
260    OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
261    if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
262      // See HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
263      // data, so we need to make sure that we do not read beyond the committed file length.
264      if (LOG.isDebugEnabled()) {
265        LOG.debug("The provider tells us the valid length for " + currentPath + " is "
266          + fileLength.getAsLong() + ", but we have advanced to " + readerPos);
267      }
268      resetReader();
269      return true;
270    }
271    if (readEntry != null) {
272      LOG.trace("reading entry: {} ", readEntry);
273      metrics.incrLogEditsRead();
274      metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
275    }
276    currentEntry = readEntry; // could be null
277    this.currentPositionOfReader = readerPos;
278    return fileLength.isPresent();
279  }
280
281  private void closeReader() throws IOException {
282    if (reader != null) {
283      reader.close();
284      reader = null;
285    }
286  }
287
288  // if we don't have a reader, open a reader on the next log
289  private boolean checkReader() throws IOException {
290    if (reader == null) {
291      return openNextLog();
292    }
293    return true;
294  }
295
296  // open a reader on the next log in queue
297  private boolean openNextLog() throws IOException {
298    PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
299    Path nextPath = queue.peek();
300    if (nextPath != null) {
301      openReader(nextPath);
302      if (reader != null) {
303        return true;
304      }
305    } else {
306      // no more files in queue, this could only happen for recovered queue.
307      setCurrentPath(null);
308    }
309    return false;
310  }
311
312  private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
313    // If the log was archived, continue reading from there
314    Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
315    // archivedLog can be null if unable to locate in archiveDir.
316    if (archivedLog != null) {
317      openReader(archivedLog);
318    } else {
319      throw fnfe;
320    }
321  }
322
323  private void openReader(Path path) throws IOException {
324    try {
325      // Detect if this is a new file, if so get a new reader else
326      // reset the current reader so that we see the new data
327      if (reader == null || !getCurrentPath().equals(path)) {
328        closeReader();
329        reader = WALFactory.createReader(fs, path, conf);
330        seek();
331        setCurrentPath(path);
332      } else {
333        resetReader();
334      }
335    } catch (FileNotFoundException fnfe) {
336      handleFileNotFound(path, fnfe);
337    } catch (RemoteException re) {
338      IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
339      if (!(ioe instanceof FileNotFoundException)) {
340        throw ioe;
341      }
342      handleFileNotFound(path, (FileNotFoundException) ioe);
343    } catch (LeaseNotRecoveredException lnre) {
344      // HBASE-15019 the WAL was not closed due to some hiccup.
345      LOG.warn("Try to recover the WAL lease " + path, lnre);
346      recoverLease(conf, path);
347      reader = null;
348    } catch (NullPointerException npe) {
349      // Workaround for race condition in HDFS-4380
350      // which throws a NPE if we open a file before any data node has the most recent block
351      // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
352      LOG.warn("Got NPE opening reader, will retry.");
353      reader = null;
354    }
355  }
356
357  // For HBASE-15019
358  private void recoverLease(final Configuration conf, final Path path) {
359    try {
360      final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf);
361      RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
362        @Override
363        public boolean progress() {
364          LOG.debug("recover WAL lease: " + path);
365          return true;
366        }
367      });
368    } catch (IOException e) {
369      LOG.warn("unable to recover lease for WAL: " + path, e);
370    }
371  }
372
373  private void resetReader() throws IOException {
374    try {
375      currentEntry = null;
376      reader.reset();
377      seek();
378    } catch (FileNotFoundException fnfe) {
379      // If the log was archived, continue reading from there
380      Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
381      // archivedLog can be null if unable to locate in archiveDir.
382      if (archivedLog != null) {
383        openReader(archivedLog);
384      } else {
385        throw fnfe;
386      }
387    } catch (NullPointerException npe) {
388      throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
389    }
390  }
391
392  private void seek() throws IOException {
393    if (currentPositionOfEntry != 0) {
394      reader.seek(currentPositionOfEntry);
395    }
396  }
397
398  private long currentTrailerSize() {
399    long size = -1L;
400    if (reader instanceof ProtobufLogReader) {
401      final ProtobufLogReader pblr = (ProtobufLogReader) reader;
402      size = pblr.trailerSize();
403    }
404    return size;
405  }
406}