001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.Closeable;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.util.OptionalLong;
025import java.util.concurrent.PriorityBlockingQueue;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
033import org.apache.hadoop.hbase.util.CancelableProgressable;
034import org.apache.hadoop.hbase.util.CommonFSUtils;
035import org.apache.hadoop.hbase.util.FSUtils;
036import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
037import org.apache.hadoop.hbase.wal.WAL.Entry;
038import org.apache.hadoop.hbase.wal.WAL.Reader;
039import org.apache.hadoop.hbase.wal.WALFactory;
040import org.apache.hadoop.ipc.RemoteException;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.apache.yetus.audience.InterfaceStability;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
048 * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
049 * dequeues it and starts reading from the next.
050 */
051@InterfaceAudience.Private
052@InterfaceStability.Evolving
053class WALEntryStream implements Closeable {
054  private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class);
055
056  private Reader reader;
057  private Path currentPath;
058  // cache of next entry for hasNext()
059  private Entry currentEntry;
060  // position for the current entry. As now we support peek, which means that the upper layer may
061  // choose to return before reading the current entry, so it is not safe to return the value below
062  // in getPosition.
063  private long currentPositionOfEntry = 0;
064  // position after reading current entry
065  private long currentPositionOfReader = 0;
066  private final PriorityBlockingQueue<Path> logQueue;
067  private final FileSystem fs;
068  private final Configuration conf;
069  private final WALFileLengthProvider walFileLengthProvider;
070  // which region server the WALs belong to
071  private final ServerName serverName;
072  private final MetricsSource metrics;
073
074  /**
075   * Create an entry stream over the given queue at the given start position
076   * @param logQueue the queue of WAL paths
077   * @param conf the {@link Configuration} to use to create {@link Reader} for this stream
078   * @param startPosition the position in the first WAL to start reading at
079   * @param walFileLengthProvider provides the length of the WAL file
080   * @param serverName the server name which all WALs belong to
081   * @param metrics the replication metrics
082   * @throws IOException
083   */
084  public WALEntryStream(PriorityBlockingQueue<Path> logQueue, Configuration conf,
085      long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
086      MetricsSource metrics) throws IOException {
087    this.logQueue = logQueue;
088    this.fs = CommonFSUtils.getWALFileSystem(conf);
089    this.conf = conf;
090    this.currentPositionOfEntry = startPosition;
091    this.walFileLengthProvider = walFileLengthProvider;
092    this.serverName = serverName;
093    this.metrics = metrics;
094  }
095
096  /**
097   * @return true if there is another WAL {@link Entry}
098   */
099  public boolean hasNext() throws IOException {
100    if (currentEntry == null) {
101      tryAdvanceEntry();
102    }
103    return currentEntry != null;
104  }
105
106  /**
107   * Returns the next WAL entry in this stream but does not advance.
108   */
109  public Entry peek() throws IOException {
110    return hasNext() ? currentEntry: null;
111  }
112
113  /**
114   * Returns the next WAL entry in this stream and advance the stream.
115   */
116  public Entry next() throws IOException {
117    Entry save = peek();
118    currentPositionOfEntry = currentPositionOfReader;
119    currentEntry = null;
120    return save;
121  }
122
123  /**
124   * {@inheritDoc}
125   */
126  @Override
127  public void close() throws IOException {
128    closeReader();
129  }
130
131  /**
132   * @return the position of the last Entry returned by next()
133   */
134  public long getPosition() {
135    return currentPositionOfEntry;
136  }
137
138  /**
139   * @return the {@link Path} of the current WAL
140   */
141  public Path getCurrentPath() {
142    return currentPath;
143  }
144
145  private String getCurrentPathStat() {
146    StringBuilder sb = new StringBuilder();
147    if (currentPath != null) {
148      sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
149          .append(currentPositionOfEntry).append("\n");
150    } else {
151      sb.append("no replication ongoing, waiting for new log");
152    }
153    return sb.toString();
154  }
155
156  /**
157   * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
158   * false)
159   */
160  public void reset() throws IOException {
161    if (reader != null && currentPath != null) {
162      resetReader();
163    }
164  }
165
166  private void setPosition(long position) {
167    currentPositionOfEntry = position;
168  }
169
170  private void setCurrentPath(Path path) {
171    this.currentPath = path;
172  }
173
174  private void tryAdvanceEntry() throws IOException {
175    if (checkReader()) {
176      boolean beingWritten = readNextEntryAndRecordReaderPosition();
177      if (currentEntry == null && !beingWritten) {
178        // no more entries in this log file, and the file is already closed, i.e, rolled
179        // Before dequeueing, we should always get one more attempt at reading.
180        // This is in case more entries came in after we opened the reader, and the log is rolled
181        // while we were reading. See HBASE-6758
182        resetReader();
183        readNextEntryAndRecordReaderPosition();
184        if (currentEntry == null) {
185          if (checkAllBytesParsed()) { // now we're certain we're done with this log file
186            dequeueCurrentLog();
187            if (openNextLog()) {
188              readNextEntryAndRecordReaderPosition();
189            }
190          }
191        }
192      }
193      // if currentEntry != null then just return
194      // if currentEntry == null but the file is still being written, then we should not switch to
195      // the next log either, just return here and try next time to see if there are more entries in
196      // the current file
197    }
198    // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
199  }
200
201  // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
202  private boolean checkAllBytesParsed() throws IOException {
203    // -1 means the wal wasn't closed cleanly.
204    final long trailerSize = currentTrailerSize();
205    FileStatus stat = null;
206    try {
207      stat = fs.getFileStatus(this.currentPath);
208    } catch (IOException exception) {
209      LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}",
210        currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat());
211      metrics.incrUnknownFileLengthForClosedWAL();
212    }
213    // Here we use currentPositionOfReader instead of currentPositionOfEntry.
214    // We only call this method when currentEntry is null so usually they are the same, but there
215    // are two exceptions. One is we have nothing in the file but only a header, in this way
216    // the currentPositionOfEntry will always be 0 since we have no change to update it. The other
217    // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the
218    // last valid entry, and the currentPositionOfReader will usually point to the end of the file.
219    if (stat != null) {
220      if (trailerSize < 0) {
221        if (currentPositionOfReader < stat.getLen()) {
222          final long skippedBytes = stat.getLen() - currentPositionOfReader;
223          LOG.debug(
224            "Reached the end of WAL file '{}'. It was not closed cleanly," +
225              " so we did not parse {} bytes of data. This is normally ok.",
226            currentPath, skippedBytes);
227          metrics.incrUncleanlyClosedWALs();
228          metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
229        }
230      } else if (currentPositionOfReader + trailerSize < stat.getLen()) {
231        LOG.warn(
232          "Processing end of WAL file '{}'. At position {}, which is too far away from" +
233            " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
234          currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
235        setPosition(0);
236        resetReader();
237        metrics.incrRestartedWALReading();
238        metrics.incrRepeatedFileBytes(currentPositionOfReader);
239        return false;
240      }
241    }
242    if (LOG.isTraceEnabled()) {
243      LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " +
244        (stat == null ? "N/A" : stat.getLen()));
245    }
246    metrics.incrCompletedWAL();
247    return true;
248  }
249
250  private void dequeueCurrentLog() throws IOException {
251    LOG.debug("Reached the end of log {}", currentPath);
252    closeReader();
253    logQueue.remove();
254    setPosition(0);
255    metrics.decrSizeOfLogQueue();
256  }
257
258  /**
259   * Returns whether the file is opened for writing.
260   */
261  private boolean readNextEntryAndRecordReaderPosition() throws IOException {
262    Entry readEntry = reader.next();
263    long readerPos = reader.getPosition();
264    OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
265    if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
266      // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
267      // data, so we need to make sure that we do not read beyond the committed file length.
268      if (LOG.isDebugEnabled()) {
269        LOG.debug("The provider tells us the valid length for " + currentPath + " is " +
270            fileLength.getAsLong() + ", but we have advanced to " + readerPos);
271      }
272      resetReader();
273      return true;
274    }
275    if (readEntry != null) {
276      metrics.incrLogEditsRead();
277      metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
278    }
279    currentEntry = readEntry; // could be null
280    this.currentPositionOfReader = readerPos;
281    return fileLength.isPresent();
282  }
283
284  private void closeReader() throws IOException {
285    if (reader != null) {
286      reader.close();
287      reader = null;
288    }
289  }
290
291  // if we don't have a reader, open a reader on the next log
292  private boolean checkReader() throws IOException {
293    if (reader == null) {
294      return openNextLog();
295    }
296    return true;
297  }
298
299  // open a reader on the next log in queue
300  private boolean openNextLog() throws IOException {
301    Path nextPath = logQueue.peek();
302    if (nextPath != null) {
303      openReader(nextPath);
304      if (reader != null) {
305        return true;
306      }
307    } else {
308      // no more files in queue, this could only happen for recovered queue.
309      setCurrentPath(null);
310    }
311    return false;
312  }
313
314  private Path getArchivedLog(Path path) throws IOException {
315    Path walRootDir = CommonFSUtils.getWALRootDir(conf);
316
317    // Try found the log in old dir
318    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
319    Path archivedLogLocation = new Path(oldLogDir, path.getName());
320    if (fs.exists(archivedLogLocation)) {
321      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
322      return archivedLogLocation;
323    }
324
325    // Try found the log in the seperate old log dir
326    oldLogDir =
327        new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
328            .append(Path.SEPARATOR).append(serverName.getServerName()).toString());
329    archivedLogLocation = new Path(oldLogDir, path.getName());
330    if (fs.exists(archivedLogLocation)) {
331      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
332      return archivedLogLocation;
333    }
334
335    LOG.error("Couldn't locate log: " + path);
336    return path;
337  }
338
339  private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
340    // If the log was archived, continue reading from there
341    Path archivedLog = getArchivedLog(path);
342    if (!path.equals(archivedLog)) {
343      openReader(archivedLog);
344    } else {
345      throw fnfe;
346    }
347  }
348
349  private void openReader(Path path) throws IOException {
350    try {
351      // Detect if this is a new file, if so get a new reader else
352      // reset the current reader so that we see the new data
353      if (reader == null || !getCurrentPath().equals(path)) {
354        closeReader();
355        reader = WALFactory.createReader(fs, path, conf);
356        seek();
357        setCurrentPath(path);
358      } else {
359        resetReader();
360      }
361    } catch (FileNotFoundException fnfe) {
362      handleFileNotFound(path, fnfe);
363    }  catch (RemoteException re) {
364      IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
365      if (!(ioe instanceof FileNotFoundException)) throw ioe;
366      handleFileNotFound(path, (FileNotFoundException)ioe);
367    } catch (LeaseNotRecoveredException lnre) {
368      // HBASE-15019 the WAL was not closed due to some hiccup.
369      LOG.warn("Try to recover the WAL lease " + currentPath, lnre);
370      recoverLease(conf, currentPath);
371      reader = null;
372    } catch (NullPointerException npe) {
373      // Workaround for race condition in HDFS-4380
374      // which throws a NPE if we open a file before any data node has the most recent block
375      // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
376      LOG.warn("Got NPE opening reader, will retry.");
377      reader = null;
378    }
379  }
380
381  // For HBASE-15019
382  private void recoverLease(final Configuration conf, final Path path) {
383    try {
384
385      final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf);
386      FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
387      fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
388        @Override
389        public boolean progress() {
390          LOG.debug("recover WAL lease: " + path);
391          return true;
392        }
393      });
394    } catch (IOException e) {
395      LOG.warn("unable to recover lease for WAL: " + path, e);
396    }
397  }
398
399  private void resetReader() throws IOException {
400    try {
401      currentEntry = null;
402      reader.reset();
403      seek();
404    } catch (FileNotFoundException fnfe) {
405      // If the log was archived, continue reading from there
406      Path archivedLog = getArchivedLog(currentPath);
407      if (!currentPath.equals(archivedLog)) {
408        openReader(archivedLog);
409      } else {
410        throw fnfe;
411      }
412    } catch (NullPointerException npe) {
413      throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
414    }
415  }
416
417  private void seek() throws IOException {
418    if (currentPositionOfEntry != 0) {
419      reader.seek(currentPositionOfEntry);
420    }
421  }
422
423  private long currentTrailerSize() {
424    long size = -1L;
425    if (reader instanceof ProtobufLogReader) {
426      final ProtobufLogReader pblr = (ProtobufLogReader) reader;
427      size = pblr.trailerSize();
428    }
429    return size;
430  }
431}