001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.util.OptionalLong;
024import java.util.concurrent.PriorityBlockingQueue;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileStatus;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
031import org.apache.hadoop.hbase.util.CancelableProgressable;
032import org.apache.hadoop.hbase.util.CommonFSUtils;
033import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
034import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
035import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
036import org.apache.hadoop.hbase.wal.WAL.Entry;
037import org.apache.hadoop.hbase.wal.WAL.Reader;
038import org.apache.hadoop.hbase.wal.WALFactory;
039import org.apache.hadoop.ipc.RemoteException;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.apache.yetus.audience.InterfaceStability;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
047 * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
048 * dequeues it and starts reading from the next.
049 */
050@InterfaceAudience.Private
051@InterfaceStability.Evolving
052class WALEntryStream implements Closeable {
053  private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class);
054
055  private Reader reader;
056  private Path currentPath;
057  // cache of next entry for hasNext()
058  private Entry currentEntry;
059  // position for the current entry. As now we support peek, which means that the upper layer may
060  // choose to return before reading the current entry, so it is not safe to return the value below
061  // in getPosition.
062  private long currentPositionOfEntry = 0;
063  // position after reading current entry
064  private long currentPositionOfReader = 0;
065  private final ReplicationSourceLogQueue logQueue;
066  private final String walGroupId;
067  private final FileSystem fs;
068  private final Configuration conf;
069  private final WALFileLengthProvider walFileLengthProvider;
070  // which region server the WALs belong to
071  private final ServerName serverName;
072  private final MetricsSource metrics;
073
074  /**
075   * Create an entry stream over the given queue at the given start position
076   * @param logQueue              the queue of WAL paths
077   * @param conf                  the {@link Configuration} to use to create {@link Reader} for this
078   *                              stream
079   * @param startPosition         the position in the first WAL to start reading at
080   * @param walFileLengthProvider provides the length of the WAL file
081   * @param serverName            the server name which all WALs belong to
082   * @param metrics               the replication metrics
083   * @throws IOException throw IO exception from stream
084   */
085  public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf, long startPosition,
086    WALFileLengthProvider walFileLengthProvider, ServerName serverName, MetricsSource metrics,
087    String walGroupId) throws IOException {
088    this.logQueue = logQueue;
089    this.fs = CommonFSUtils.getWALFileSystem(conf);
090    this.conf = conf;
091    this.currentPositionOfEntry = startPosition;
092    this.walFileLengthProvider = walFileLengthProvider;
093    this.serverName = serverName;
094    this.metrics = metrics;
095    this.walGroupId = walGroupId;
096  }
097
098  /**
099   * @return true if there is another WAL {@link Entry}
100   */
101  public boolean hasNext() throws IOException {
102    if (currentEntry == null) {
103      tryAdvanceEntry();
104    }
105    return currentEntry != null;
106  }
107
108  /**
109   * Returns the next WAL entry in this stream but does not advance.
110   */
111  public Entry peek() throws IOException {
112    return hasNext() ? currentEntry : null;
113  }
114
115  /**
116   * Returns the next WAL entry in this stream and advance the stream.
117   */
118  public Entry next() throws IOException {
119    Entry save = peek();
120    currentPositionOfEntry = currentPositionOfReader;
121    currentEntry = null;
122    return save;
123  }
124
125  /**
126   * {@inheritDoc}
127   */
128  @Override
129  public void close() throws IOException {
130    closeReader();
131  }
132
133  /**
134   * @return the position of the last Entry returned by next()
135   */
136  public long getPosition() {
137    return currentPositionOfEntry;
138  }
139
140  /**
141   * @return the {@link Path} of the current WAL
142   */
143  public Path getCurrentPath() {
144    return currentPath;
145  }
146
147  private String getCurrentPathStat() {
148    StringBuilder sb = new StringBuilder();
149    if (currentPath != null) {
150      sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
151        .append(currentPositionOfEntry).append("\n");
152    } else {
153      sb.append("no replication ongoing, waiting for new log");
154    }
155    return sb.toString();
156  }
157
158  /**
159   * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
160   * false)
161   */
162  public void reset() throws IOException {
163    if (reader != null && currentPath != null) {
164      resetReader();
165    }
166  }
167
168  private void setPosition(long position) {
169    currentPositionOfEntry = position;
170  }
171
172  private void setCurrentPath(Path path) {
173    this.currentPath = path;
174  }
175
176  private void tryAdvanceEntry() throws IOException {
177    if (checkReader()) {
178      boolean beingWritten = readNextEntryAndRecordReaderPosition();
179      LOG.trace("Reading WAL {}; currently open for write={}", this.currentPath, beingWritten);
180      if (currentEntry == null && !beingWritten) {
181        // no more entries in this log file, and the file is already closed, i.e, rolled
182        // Before dequeueing, we should always get one more attempt at reading.
183        // This is in case more entries came in after we opened the reader, and the log is rolled
184        // while we were reading. See HBASE-6758
185        resetReader();
186        readNextEntryAndRecordReaderPosition();
187        if (currentEntry == null) {
188          if (checkAllBytesParsed()) { // now we're certain we're done with this log file
189            dequeueCurrentLog();
190            if (openNextLog()) {
191              readNextEntryAndRecordReaderPosition();
192            }
193          }
194        }
195      }
196      // if currentEntry != null then just return
197      // if currentEntry == null but the file is still being written, then we should not switch to
198      // the next log either, just return here and try next time to see if there are more entries in
199      // the current file
200    }
201    // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
202  }
203
204  // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
205  private boolean checkAllBytesParsed() throws IOException {
206    // -1 means the wal wasn't closed cleanly.
207    final long trailerSize = currentTrailerSize();
208    FileStatus stat = null;
209    try {
210      stat = fs.getFileStatus(this.currentPath);
211    } catch (IOException exception) {
212      LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}",
213        currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat());
214      metrics.incrUnknownFileLengthForClosedWAL();
215    }
216    // Here we use currentPositionOfReader instead of currentPositionOfEntry.
217    // We only call this method when currentEntry is null so usually they are the same, but there
218    // are two exceptions. One is we have nothing in the file but only a header, in this way
219    // the currentPositionOfEntry will always be 0 since we have no change to update it. The other
220    // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the
221    // last valid entry, and the currentPositionOfReader will usually point to the end of the file.
222    if (stat != null) {
223      if (trailerSize < 0) {
224        if (currentPositionOfReader < stat.getLen()) {
225          final long skippedBytes = stat.getLen() - currentPositionOfReader;
226          // See the commits in HBASE-25924/HBASE-25932 for context.
227          LOG.warn("Reached the end of WAL {}. It was not closed cleanly,"
228            + " so we did not parse {} bytes of data.", currentPath, skippedBytes);
229          metrics.incrUncleanlyClosedWALs();
230          metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
231        }
232      } else if (currentPositionOfReader + trailerSize < stat.getLen()) {
233        LOG.warn(
234          "Processing end of WAL {} at position {}, which is too far away from"
235            + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
236          currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
237        setPosition(0);
238        resetReader();
239        metrics.incrRestartedWALReading();
240        metrics.incrRepeatedFileBytes(currentPositionOfReader);
241        return false;
242      }
243    }
244    if (LOG.isTraceEnabled()) {
245      LOG.trace("Reached the end of " + this.currentPath + " and length of the file is "
246        + (stat == null ? "N/A" : stat.getLen()));
247    }
248    metrics.incrCompletedWAL();
249    return true;
250  }
251
252  private void dequeueCurrentLog() throws IOException {
253    LOG.debug("EOF, closing {}", currentPath);
254    closeReader();
255    logQueue.remove(walGroupId);
256    setCurrentPath(null);
257    setPosition(0);
258  }
259
260  /**
261   * Returns whether the file is opened for writing.
262   */
263  private boolean readNextEntryAndRecordReaderPosition() throws IOException {
264    Entry readEntry = reader.next();
265    long readerPos = reader.getPosition();
266    OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
267    if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
268      // See HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
269      // data, so we need to make sure that we do not read beyond the committed file length.
270      if (LOG.isDebugEnabled()) {
271        LOG.debug("The provider tells us the valid length for " + currentPath + " is "
272          + fileLength.getAsLong() + ", but we have advanced to " + readerPos);
273      }
274      resetReader();
275      return true;
276    }
277    if (readEntry != null) {
278      LOG.trace("reading entry: {} ", readEntry);
279      metrics.incrLogEditsRead();
280      metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
281    }
282    currentEntry = readEntry; // could be null
283    this.currentPositionOfReader = readerPos;
284    return fileLength.isPresent();
285  }
286
287  private void closeReader() throws IOException {
288    if (reader != null) {
289      reader.close();
290      reader = null;
291    }
292  }
293
294  // if we don't have a reader, open a reader on the next log
295  private boolean checkReader() throws IOException {
296    if (reader == null) {
297      return openNextLog();
298    }
299    return true;
300  }
301
302  // open a reader on the next log in queue
303  private boolean openNextLog() throws IOException {
304    PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
305    Path nextPath = queue.peek();
306    if (nextPath != null) {
307      openReader(nextPath);
308      if (reader != null) {
309        return true;
310      }
311    } else {
312      // no more files in queue, this could happen for recovered queue, or for a wal group of a sync
313      // replication peer which has already been transited to DA or S.
314      setCurrentPath(null);
315    }
316    return false;
317  }
318
319  private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
320    // If the log was archived, continue reading from there
321    Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
322    // archivedLog can be null if unable to locate in archiveDir.
323    if (archivedLog != null) {
324      openReader(archivedLog);
325    } else {
326      throw fnfe;
327    }
328  }
329
330  private void openReader(Path path) throws IOException {
331    try {
332      // Detect if this is a new file, if so get a new reader else
333      // reset the current reader so that we see the new data
334      if (reader == null || !getCurrentPath().equals(path)) {
335        closeReader();
336        reader = WALFactory.createReader(fs, path, conf);
337        seek();
338        setCurrentPath(path);
339      } else {
340        resetReader();
341      }
342    } catch (FileNotFoundException fnfe) {
343      handleFileNotFound(path, fnfe);
344    } catch (RemoteException re) {
345      IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
346      if (!(ioe instanceof FileNotFoundException)) {
347        throw ioe;
348      }
349      handleFileNotFound(path, (FileNotFoundException) ioe);
350    } catch (LeaseNotRecoveredException lnre) {
351      // HBASE-15019 the WAL was not closed due to some hiccup.
352      LOG.warn("Try to recover the WAL lease " + path, lnre);
353      recoverLease(conf, path);
354      reader = null;
355    } catch (NullPointerException npe) {
356      // Workaround for race condition in HDFS-4380
357      // which throws a NPE if we open a file before any data node has the most recent block
358      // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
359      LOG.warn("Got NPE opening reader, will retry.");
360      reader = null;
361    }
362  }
363
364  // For HBASE-15019
365  private void recoverLease(final Configuration conf, final Path path) {
366    try {
367      final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf);
368      RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
369        @Override
370        public boolean progress() {
371          LOG.debug("recover WAL lease: " + path);
372          return true;
373        }
374      });
375    } catch (IOException e) {
376      LOG.warn("unable to recover lease for WAL: " + path, e);
377    }
378  }
379
380  private void resetReader() throws IOException {
381    try {
382      currentEntry = null;
383      reader.reset();
384      seek();
385    } catch (FileNotFoundException fnfe) {
386      // If the log was archived, continue reading from there
387      Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
388      // archivedLog can be null if unable to locate in archiveDir.
389      if (archivedLog != null) {
390        openReader(archivedLog);
391      } else {
392        throw fnfe;
393      }
394    } catch (NullPointerException npe) {
395      throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
396    }
397  }
398
399  private void seek() throws IOException {
400    if (currentPositionOfEntry != 0) {
401      reader.seek(currentPositionOfEntry);
402    }
403  }
404
405  private long currentTrailerSize() {
406    long size = -1L;
407    if (reader instanceof ProtobufLogReader) {
408      final ProtobufLogReader pblr = (ProtobufLogReader) reader;
409      size = pblr.trailerSize();
410    }
411    return size;
412  }
413}