001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.Closeable;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.util.OptionalLong;
025import java.util.concurrent.PriorityBlockingQueue;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
033import org.apache.hadoop.hbase.util.CancelableProgressable;
034import org.apache.hadoop.hbase.util.CommonFSUtils;
035import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
036import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
037import org.apache.hadoop.hbase.wal.WAL.Entry;
038import org.apache.hadoop.hbase.wal.WAL.Reader;
039import org.apache.hadoop.hbase.wal.WALFactory;
040import org.apache.hadoop.ipc.RemoteException;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.apache.yetus.audience.InterfaceStability;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
048 * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
049 * dequeues it and starts reading from the next.
050 */
051@InterfaceAudience.Private
052@InterfaceStability.Evolving
053class WALEntryStream implements Closeable {
054  private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class);
055
056  private Reader reader;
057  private Path currentPath;
058  // cache of next entry for hasNext()
059  private Entry currentEntry;
060  // position for the current entry. As now we support peek, which means that the upper layer may
061  // choose to return before reading the current entry, so it is not safe to return the value below
062  // in getPosition.
063  private long currentPositionOfEntry = 0;
064  // position after reading current entry
065  private long currentPositionOfReader = 0;
066  private final PriorityBlockingQueue<Path> logQueue;
067  private final FileSystem fs;
068  private final Configuration conf;
069  private final WALFileLengthProvider walFileLengthProvider;
070  // which region server the WALs belong to
071  private final ServerName serverName;
072  private final MetricsSource metrics;
073
074  /**
075   * Create an entry stream over the given queue at the given start position
076   * @param logQueue the queue of WAL paths
077   * @param conf the {@link Configuration} to use to create {@link Reader} for this stream
078   * @param startPosition the position in the first WAL to start reading at
079   * @param walFileLengthProvider provides the length of the WAL file
080   * @param serverName the server name which all WALs belong to
081   * @param metrics the replication metrics
082   * @throws IOException
083   */
084  public WALEntryStream(PriorityBlockingQueue<Path> logQueue, Configuration conf,
085      long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
086      MetricsSource metrics) throws IOException {
087    this.logQueue = logQueue;
088    this.fs = CommonFSUtils.getWALFileSystem(conf);
089    this.conf = conf;
090    this.currentPositionOfEntry = startPosition;
091    this.walFileLengthProvider = walFileLengthProvider;
092    this.serverName = serverName;
093    this.metrics = metrics;
094  }
095
096  /**
097   * @return true if there is another WAL {@link Entry}
098   */
099  public boolean hasNext() throws IOException {
100    if (currentEntry == null) {
101      tryAdvanceEntry();
102    }
103    return currentEntry != null;
104  }
105
106  /**
107   * Returns the next WAL entry in this stream but does not advance.
108   */
109  public Entry peek() throws IOException {
110    return hasNext() ? currentEntry: null;
111  }
112
113  /**
114   * Returns the next WAL entry in this stream and advance the stream.
115   */
116  public Entry next() throws IOException {
117    Entry save = peek();
118    currentPositionOfEntry = currentPositionOfReader;
119    currentEntry = null;
120    return save;
121  }
122
123  /**
124   * {@inheritDoc}
125   */
126  @Override
127  public void close() throws IOException {
128    closeReader();
129  }
130
131  /**
132   * @return the position of the last Entry returned by next()
133   */
134  public long getPosition() {
135    return currentPositionOfEntry;
136  }
137
138  /**
139   * @return the {@link Path} of the current WAL
140   */
141  public Path getCurrentPath() {
142    return currentPath;
143  }
144
145  private String getCurrentPathStat() {
146    StringBuilder sb = new StringBuilder();
147    if (currentPath != null) {
148      sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
149          .append(currentPositionOfEntry).append("\n");
150    } else {
151      sb.append("no replication ongoing, waiting for new log");
152    }
153    return sb.toString();
154  }
155
156  /**
157   * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
158   * false)
159   */
160  public void reset() throws IOException {
161    if (reader != null && currentPath != null) {
162      resetReader();
163    }
164  }
165
166  private void setPosition(long position) {
167    currentPositionOfEntry = position;
168  }
169
170  private void setCurrentPath(Path path) {
171    this.currentPath = path;
172  }
173
174  private void tryAdvanceEntry() throws IOException {
175    if (checkReader()) {
176      boolean beingWritten = readNextEntryAndRecordReaderPosition();
177      LOG.trace("reading wal file {}. Current open for write: {}", this.currentPath, beingWritten);
178      if (currentEntry == null && !beingWritten) {
179        // no more entries in this log file, and the file is already closed, i.e, rolled
180        // Before dequeueing, we should always get one more attempt at reading.
181        // This is in case more entries came in after we opened the reader, and the log is rolled
182        // while we were reading. See HBASE-6758
183        resetReader();
184        readNextEntryAndRecordReaderPosition();
185        if (currentEntry == null) {
186          if (checkAllBytesParsed()) { // now we're certain we're done with this log file
187            dequeueCurrentLog();
188            if (openNextLog()) {
189              readNextEntryAndRecordReaderPosition();
190            }
191          }
192        }
193      }
194      // if currentEntry != null then just return
195      // if currentEntry == null but the file is still being written, then we should not switch to
196      // the next log either, just return here and try next time to see if there are more entries in
197      // the current file
198    }
199    // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
200  }
201
202  // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
203  private boolean checkAllBytesParsed() throws IOException {
204    // -1 means the wal wasn't closed cleanly.
205    final long trailerSize = currentTrailerSize();
206    FileStatus stat = null;
207    try {
208      stat = fs.getFileStatus(this.currentPath);
209    } catch (IOException exception) {
210      LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}",
211        currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat());
212      metrics.incrUnknownFileLengthForClosedWAL();
213    }
214    // Here we use currentPositionOfReader instead of currentPositionOfEntry.
215    // We only call this method when currentEntry is null so usually they are the same, but there
216    // are two exceptions. One is we have nothing in the file but only a header, in this way
217    // the currentPositionOfEntry will always be 0 since we have no change to update it. The other
218    // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the
219    // last valid entry, and the currentPositionOfReader will usually point to the end of the file.
220    if (stat != null) {
221      if (trailerSize < 0) {
222        if (currentPositionOfReader < stat.getLen()) {
223          final long skippedBytes = stat.getLen() - currentPositionOfReader;
224          LOG.debug(
225            "Reached the end of WAL file '{}'. It was not closed cleanly," +
226              " so we did not parse {} bytes of data. This is normally ok.",
227            currentPath, skippedBytes);
228          metrics.incrUncleanlyClosedWALs();
229          metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
230        }
231      } else if (currentPositionOfReader + trailerSize < stat.getLen()) {
232        LOG.warn(
233          "Processing end of WAL file '{}'. At position {}, which is too far away from" +
234            " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
235          currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
236        setPosition(0);
237        resetReader();
238        metrics.incrRestartedWALReading();
239        metrics.incrRepeatedFileBytes(currentPositionOfReader);
240        return false;
241      }
242    }
243    if (LOG.isTraceEnabled()) {
244      LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " +
245        (stat == null ? "N/A" : stat.getLen()));
246    }
247    metrics.incrCompletedWAL();
248    return true;
249  }
250
251  private void dequeueCurrentLog() throws IOException {
252    LOG.debug("Reached the end of log {}", currentPath);
253    closeReader();
254    logQueue.remove();
255    setPosition(0);
256    metrics.decrSizeOfLogQueue();
257  }
258
259  /**
260   * Returns whether the file is opened for writing.
261   */
262  private boolean readNextEntryAndRecordReaderPosition() throws IOException {
263    Entry readEntry = reader.next();
264    long readerPos = reader.getPosition();
265    OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
266    if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
267      // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
268      // data, so we need to make sure that we do not read beyond the committed file length.
269      if (LOG.isDebugEnabled()) {
270        LOG.debug("The provider tells us the valid length for " + currentPath + " is " +
271            fileLength.getAsLong() + ", but we have advanced to " + readerPos);
272      }
273      resetReader();
274      return true;
275    }
276    if (readEntry != null) {
277      LOG.trace("reading entry: {} ", readEntry);
278      metrics.incrLogEditsRead();
279      metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
280    }
281    currentEntry = readEntry; // could be null
282    this.currentPositionOfReader = readerPos;
283    return fileLength.isPresent();
284  }
285
286  private void closeReader() throws IOException {
287    if (reader != null) {
288      reader.close();
289      reader = null;
290    }
291  }
292
293  // if we don't have a reader, open a reader on the next log
294  private boolean checkReader() throws IOException {
295    if (reader == null) {
296      return openNextLog();
297    }
298    return true;
299  }
300
301  // open a reader on the next log in queue
302  private boolean openNextLog() throws IOException {
303    Path nextPath = logQueue.peek();
304    if (nextPath != null) {
305      openReader(nextPath);
306      if (reader != null) {
307        return true;
308      }
309    } else {
310      // no more files in queue, this could only happen for recovered queue.
311      setCurrentPath(null);
312    }
313    return false;
314  }
315
316  private Path getArchivedLog(Path path) throws IOException {
317    Path walRootDir = CommonFSUtils.getWALRootDir(conf);
318
319    // Try found the log in old dir
320    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
321    Path archivedLogLocation = new Path(oldLogDir, path.getName());
322    if (fs.exists(archivedLogLocation)) {
323      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
324      return archivedLogLocation;
325    }
326
327    // Try found the log in the seperate old log dir
328    oldLogDir =
329        new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
330            .append(Path.SEPARATOR).append(serverName.getServerName()).toString());
331    archivedLogLocation = new Path(oldLogDir, path.getName());
332    if (fs.exists(archivedLogLocation)) {
333      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
334      return archivedLogLocation;
335    }
336
337    LOG.error("Couldn't locate log: " + path);
338    return path;
339  }
340
341  private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
342    // If the log was archived, continue reading from there
343    Path archivedLog = getArchivedLog(path);
344    if (!path.equals(archivedLog)) {
345      openReader(archivedLog);
346    } else {
347      throw fnfe;
348    }
349  }
350
351  private void openReader(Path path) throws IOException {
352    try {
353      // Detect if this is a new file, if so get a new reader else
354      // reset the current reader so that we see the new data
355      if (reader == null || !getCurrentPath().equals(path)) {
356        closeReader();
357        reader = WALFactory.createReader(fs, path, conf);
358        seek();
359        setCurrentPath(path);
360      } else {
361        resetReader();
362      }
363    } catch (FileNotFoundException fnfe) {
364      handleFileNotFound(path, fnfe);
365    }  catch (RemoteException re) {
366      IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
367      if (!(ioe instanceof FileNotFoundException)) throw ioe;
368      handleFileNotFound(path, (FileNotFoundException)ioe);
369    } catch (LeaseNotRecoveredException lnre) {
370      // HBASE-15019 the WAL was not closed due to some hiccup.
371      LOG.warn("Try to recover the WAL lease " + currentPath, lnre);
372      recoverLease(conf, currentPath);
373      reader = null;
374    } catch (NullPointerException npe) {
375      // Workaround for race condition in HDFS-4380
376      // which throws a NPE if we open a file before any data node has the most recent block
377      // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
378      LOG.warn("Got NPE opening reader, will retry.");
379      reader = null;
380    }
381  }
382
383  // For HBASE-15019
384  private void recoverLease(final Configuration conf, final Path path) {
385    try {
386      final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf);
387      RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
388        @Override
389        public boolean progress() {
390          LOG.debug("recover WAL lease: " + path);
391          return true;
392        }
393      });
394    } catch (IOException e) {
395      LOG.warn("unable to recover lease for WAL: " + path, e);
396    }
397  }
398
399  private void resetReader() throws IOException {
400    try {
401      currentEntry = null;
402      reader.reset();
403      seek();
404    } catch (FileNotFoundException fnfe) {
405      // If the log was archived, continue reading from there
406      Path archivedLog = getArchivedLog(currentPath);
407      if (!currentPath.equals(archivedLog)) {
408        openReader(archivedLog);
409      } else {
410        throw fnfe;
411      }
412    } catch (NullPointerException npe) {
413      throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
414    }
415  }
416
417  private void seek() throws IOException {
418    if (currentPositionOfEntry != 0) {
419      reader.seek(currentPositionOfEntry);
420    }
421  }
422
423  private long currentTrailerSize() {
424    long size = -1L;
425    if (reader instanceof ProtobufLogReader) {
426      final ProtobufLogReader pblr = (ProtobufLogReader) reader;
427      size = pblr.trailerSize();
428    }
429    return size;
430  }
431}