001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.util.OptionalLong;
024import java.util.concurrent.PriorityBlockingQueue;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileStatus;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
031import org.apache.hadoop.hbase.util.CancelableProgressable;
032import org.apache.hadoop.hbase.util.CommonFSUtils;
033import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
034import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
035import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
036import org.apache.hadoop.hbase.wal.WAL.Entry;
037import org.apache.hadoop.hbase.wal.WAL.Reader;
038import org.apache.hadoop.hbase.wal.WALFactory;
039import org.apache.hadoop.ipc.RemoteException;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.apache.yetus.audience.InterfaceStability;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
047 * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
048 * dequeues it and starts reading from the next.
049 */
050@InterfaceAudience.Private
051@InterfaceStability.Evolving
052class WALEntryStream implements Closeable {
053  private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class);
054
055  private Reader reader;
056  private Path currentPath;
057  // cache of next entry for hasNext()
058  private Entry currentEntry;
059  // position for the current entry. As now we support peek, which means that the upper layer may
060  // choose to return before reading the current entry, so it is not safe to return the value below
061  // in getPosition.
062  private long currentPositionOfEntry = 0;
063  // position after reading current entry
064  private long currentPositionOfReader = 0;
065  private final ReplicationSourceLogQueue logQueue;
066  private final String walGroupId;
067  private final FileSystem fs;
068  private final Configuration conf;
069  private final WALFileLengthProvider walFileLengthProvider;
070  // which region server the WALs belong to
071  private final ServerName serverName;
072  private final MetricsSource metrics;
073
074  /**
075   * Create an entry stream over the given queue at the given start position
076   * @param logQueue              the queue of WAL paths
077   * @param conf                  the {@link Configuration} to use to create {@link Reader} for this
078   *                              stream
079   * @param startPosition         the position in the first WAL to start reading at
080   * @param walFileLengthProvider provides the length of the WAL file
081   * @param serverName            the server name which all WALs belong to
082   * @param metrics               the replication metrics
083   * @throws IOException throw IO exception from stream
084   */
085  public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf, long startPosition,
086    WALFileLengthProvider walFileLengthProvider, ServerName serverName, MetricsSource metrics,
087    String walGroupId) throws IOException {
088    this.logQueue = logQueue;
089    this.fs = CommonFSUtils.getWALFileSystem(conf);
090    this.conf = conf;
091    this.currentPositionOfEntry = startPosition;
092    this.walFileLengthProvider = walFileLengthProvider;
093    this.serverName = serverName;
094    this.metrics = metrics;
095    this.walGroupId = walGroupId;
096  }
097
098  /** Returns true if there is another WAL {@link Entry} */
099  public boolean hasNext() throws IOException {
100    if (currentEntry == null) {
101      tryAdvanceEntry();
102    }
103    return currentEntry != null;
104  }
105
106  /**
107   * Returns the next WAL entry in this stream but does not advance.
108   */
109  public Entry peek() throws IOException {
110    return hasNext() ? currentEntry : null;
111  }
112
113  /**
114   * Returns the next WAL entry in this stream and advance the stream.
115   */
116  public Entry next() throws IOException {
117    Entry save = peek();
118    currentPositionOfEntry = currentPositionOfReader;
119    currentEntry = null;
120    return save;
121  }
122
123  /**
124   * {@inheritDoc}
125   */
126  @Override
127  public void close() throws IOException {
128    closeReader();
129  }
130
131  /** Returns the position of the last Entry returned by next() */
132  public long getPosition() {
133    return currentPositionOfEntry;
134  }
135
136  /** Returns the {@link Path} of the current WAL */
137  public Path getCurrentPath() {
138    return currentPath;
139  }
140
141  private String getCurrentPathStat() {
142    StringBuilder sb = new StringBuilder();
143    if (currentPath != null) {
144      sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
145        .append(currentPositionOfEntry).append("\n");
146    } else {
147      sb.append("no replication ongoing, waiting for new log");
148    }
149    return sb.toString();
150  }
151
152  /**
153   * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
154   * false)
155   */
156  public void reset() throws IOException {
157    if (reader != null && currentPath != null) {
158      resetReader();
159    }
160  }
161
162  private void setPosition(long position) {
163    currentPositionOfEntry = position;
164  }
165
166  private void setCurrentPath(Path path) {
167    this.currentPath = path;
168  }
169
170  private void tryAdvanceEntry() throws IOException {
171    if (checkReader()) {
172      boolean beingWritten = readNextEntryAndRecordReaderPosition();
173      LOG.trace("Reading WAL {}; currently open for write={}", this.currentPath, beingWritten);
174      if (currentEntry == null && !beingWritten) {
175        // no more entries in this log file, and the file is already closed, i.e, rolled
176        // Before dequeueing, we should always get one more attempt at reading.
177        // This is in case more entries came in after we opened the reader, and the log is rolled
178        // while we were reading. See HBASE-6758
179        resetReader();
180        readNextEntryAndRecordReaderPosition();
181        if (currentEntry == null) {
182          if (checkAllBytesParsed()) { // now we're certain we're done with this log file
183            dequeueCurrentLog();
184            if (openNextLog()) {
185              readNextEntryAndRecordReaderPosition();
186            }
187          }
188        }
189      }
190      // if currentEntry != null then just return
191      // if currentEntry == null but the file is still being written, then we should not switch to
192      // the next log either, just return here and try next time to see if there are more entries in
193      // the current file
194    }
195    // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
196  }
197
198  // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
199  private boolean checkAllBytesParsed() throws IOException {
200    // -1 means the wal wasn't closed cleanly.
201    final long trailerSize = currentTrailerSize();
202    FileStatus stat = null;
203    try {
204      stat = fs.getFileStatus(this.currentPath);
205    } catch (IOException exception) {
206      LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}",
207        currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat());
208      metrics.incrUnknownFileLengthForClosedWAL();
209    }
210    // Here we use currentPositionOfReader instead of currentPositionOfEntry.
211    // We only call this method when currentEntry is null so usually they are the same, but there
212    // are two exceptions. One is we have nothing in the file but only a header, in this way
213    // the currentPositionOfEntry will always be 0 since we have no change to update it. The other
214    // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the
215    // last valid entry, and the currentPositionOfReader will usually point to the end of the file.
216    if (stat != null) {
217      if (trailerSize < 0) {
218        if (currentPositionOfReader < stat.getLen()) {
219          final long skippedBytes = stat.getLen() - currentPositionOfReader;
220          // See the commits in HBASE-25924/HBASE-25932 for context.
221          LOG.warn("Reached the end of WAL {}. It was not closed cleanly,"
222            + " so we did not parse {} bytes of data.", currentPath, skippedBytes);
223          metrics.incrUncleanlyClosedWALs();
224          metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
225        }
226      } else if (currentPositionOfReader + trailerSize < stat.getLen()) {
227        LOG.warn(
228          "Processing end of WAL {} at position {}, which is too far away from"
229            + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
230          currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
231        setPosition(0);
232        resetReader();
233        metrics.incrRestartedWALReading();
234        metrics.incrRepeatedFileBytes(currentPositionOfReader);
235        return false;
236      }
237    }
238    if (LOG.isTraceEnabled()) {
239      LOG.trace("Reached the end of " + this.currentPath + " and length of the file is "
240        + (stat == null ? "N/A" : stat.getLen()));
241    }
242    metrics.incrCompletedWAL();
243    return true;
244  }
245
246  private void dequeueCurrentLog() throws IOException {
247    LOG.debug("EOF, closing {}", currentPath);
248    closeReader();
249    logQueue.remove(walGroupId);
250    setCurrentPath(null);
251    setPosition(0);
252  }
253
254  /**
255   * Returns whether the file is opened for writing.
256   */
257  private boolean readNextEntryAndRecordReaderPosition() throws IOException {
258    Entry readEntry = reader.next();
259    long readerPos = reader.getPosition();
260    OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
261    if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
262      // See HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
263      // data, so we need to make sure that we do not read beyond the committed file length.
264      if (LOG.isDebugEnabled()) {
265        LOG.debug("The provider tells us the valid length for " + currentPath + " is "
266          + fileLength.getAsLong() + ", but we have advanced to " + readerPos);
267      }
268      resetReader();
269      return true;
270    }
271    if (readEntry != null) {
272      LOG.trace("reading entry: {} ", readEntry);
273      metrics.incrLogEditsRead();
274      metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
275    }
276    currentEntry = readEntry; // could be null
277    this.currentPositionOfReader = readerPos;
278    return fileLength.isPresent();
279  }
280
281  private void closeReader() throws IOException {
282    if (reader != null) {
283      reader.close();
284      reader = null;
285    }
286  }
287
288  // if we don't have a reader, open a reader on the next log
289  private boolean checkReader() throws IOException {
290    if (reader == null) {
291      return openNextLog();
292    }
293    return true;
294  }
295
296  // open a reader on the next log in queue
297  private boolean openNextLog() throws IOException {
298    PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
299    Path nextPath = queue.peek();
300    if (nextPath != null) {
301      openReader(nextPath);
302      if (reader != null) {
303        return true;
304      }
305    } else {
306      // no more files in queue, this could happen for recovered queue, or for a wal group of a sync
307      // replication peer which has already been transited to DA or S.
308      setCurrentPath(null);
309    }
310    return false;
311  }
312
313  private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
314    // If the log was archived, continue reading from there
315    Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
316    // archivedLog can be null if unable to locate in archiveDir.
317    if (archivedLog != null) {
318      openReader(archivedLog);
319    } else {
320      throw fnfe;
321    }
322  }
323
324  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
325      justification = "HDFS-4380")
326  private void openReader(Path path) throws IOException {
327    try {
328      // Detect if this is a new file, if so get a new reader else
329      // reset the current reader so that we see the new data
330      if (reader == null || !getCurrentPath().equals(path)) {
331        closeReader();
332        reader = WALFactory.createReader(fs, path, conf);
333        seek();
334        setCurrentPath(path);
335      } else {
336        resetReader();
337      }
338    } catch (FileNotFoundException fnfe) {
339      handleFileNotFound(path, fnfe);
340    } catch (RemoteException re) {
341      IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
342      if (!(ioe instanceof FileNotFoundException)) {
343        throw ioe;
344      }
345      handleFileNotFound(path, (FileNotFoundException) ioe);
346    } catch (LeaseNotRecoveredException lnre) {
347      // HBASE-15019 the WAL was not closed due to some hiccup.
348      LOG.warn("Try to recover the WAL lease " + path, lnre);
349      recoverLease(conf, path);
350      reader = null;
351    } catch (NullPointerException npe) {
352      // Workaround for race condition in HDFS-4380
353      // which throws a NPE if we open a file before any data node has the most recent block
354      // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
355      LOG.warn("Got NPE opening reader, will retry.");
356      reader = null;
357    }
358  }
359
360  // For HBASE-15019
361  private void recoverLease(final Configuration conf, final Path path) {
362    try {
363      final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf);
364      RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
365        @Override
366        public boolean progress() {
367          LOG.debug("recover WAL lease: " + path);
368          return true;
369        }
370      });
371    } catch (IOException e) {
372      LOG.warn("unable to recover lease for WAL: " + path, e);
373    }
374  }
375
376  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
377      justification = "HDFS-4380")
378  private void resetReader() throws IOException {
379    try {
380      currentEntry = null;
381      reader.reset();
382      seek();
383    } catch (FileNotFoundException fnfe) {
384      // If the log was archived, continue reading from there
385      Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
386      // archivedLog can be null if unable to locate in archiveDir.
387      if (archivedLog != null) {
388        openReader(archivedLog);
389      } else {
390        throw fnfe;
391      }
392    } catch (NullPointerException npe) {
393      throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
394    }
395  }
396
397  private void seek() throws IOException {
398    if (currentPositionOfEntry != 0) {
399      reader.seek(currentPositionOfEntry);
400    }
401  }
402
403  private long currentTrailerSize() {
404    long size = -1L;
405    if (reader instanceof ProtobufLogReader) {
406      final ProtobufLogReader pblr = (ProtobufLogReader) reader;
407      size = pblr.trailerSize();
408    }
409    return size;
410  }
411}