001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.Closeable;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.util.NoSuchElementException;
025import java.util.OptionalLong;
026import java.util.concurrent.PriorityBlockingQueue;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.apache.yetus.audience.InterfaceStability;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
039import org.apache.hadoop.hbase.util.CancelableProgressable;
040import org.apache.hadoop.hbase.util.CommonFSUtils;
041import org.apache.hadoop.hbase.util.FSUtils;
042import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
043import org.apache.hadoop.hbase.wal.WAL.Entry;
044import org.apache.hadoop.hbase.wal.WAL.Reader;
045import org.apache.hadoop.hbase.wal.WALFactory;
046import org.apache.hadoop.ipc.RemoteException;
047
048/**
049 * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
050 * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
051 * dequeues it and starts reading from the next.
052 */
053@InterfaceAudience.Private
054@InterfaceStability.Evolving
055class WALEntryStream implements Closeable {
056  private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class);
057
058  private Reader reader;
059  private Path currentPath;
060  // cache of next entry for hasNext()
061  private Entry currentEntry;
062  // position after reading current entry
063  private long currentPosition = 0;
064  private final PriorityBlockingQueue<Path> logQueue;
065  private final FileSystem fs;
066  private final Configuration conf;
067  private final WALFileLengthProvider walFileLengthProvider;
068  // which region server the WALs belong to
069  private final ServerName serverName;
070  private final MetricsSource metrics;
071
072  /**
073   * Create an entry stream over the given queue at the given start position
074   * @param logQueue the queue of WAL paths
075   * @param conf the {@link Configuration} to use to create {@link Reader} for this stream
076   * @param startPosition the position in the first WAL to start reading at
077   * @param walFileLengthProvider provides the length of the WAL file
078   * @param serverName the server name which all WALs belong to
079   * @param metrics the replication metrics
080   * @throws IOException
081   */
082  public WALEntryStream(PriorityBlockingQueue<Path> logQueue, Configuration conf,
083      long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
084      MetricsSource metrics) throws IOException {
085    this.logQueue = logQueue;
086    this.fs = CommonFSUtils.getWALFileSystem(conf);
087    this.conf = conf;
088    this.currentPosition = startPosition;
089    this.walFileLengthProvider = walFileLengthProvider;
090    this.serverName = serverName;
091    this.metrics = metrics;
092  }
093
094  /**
095   * @return true if there is another WAL {@link Entry}
096   */
097  public boolean hasNext() throws IOException {
098    if (currentEntry == null) {
099      tryAdvanceEntry();
100    }
101    return currentEntry != null;
102  }
103
104  /**
105   * @return the next WAL entry in this stream
106   * @throws IOException
107   * @throws NoSuchElementException if no more entries in the stream.
108   */
109  public Entry next() throws IOException {
110    if (!hasNext()) {
111      throw new NoSuchElementException();
112    }
113    Entry save = currentEntry;
114    currentEntry = null; // gets reloaded by hasNext()
115    return save;
116  }
117
118  /**
119   * {@inheritDoc}
120   */
121  @Override
122  public void close() throws IOException {
123    closeReader();
124  }
125
126  /**
127   * @return the position of the last Entry returned by next()
128   */
129  public long getPosition() {
130    return currentPosition;
131  }
132
133  /**
134   * @return the {@link Path} of the current WAL
135   */
136  public Path getCurrentPath() {
137    return currentPath;
138  }
139
140  private String getCurrentPathStat() {
141    StringBuilder sb = new StringBuilder();
142    if (currentPath != null) {
143      sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
144          .append(currentPosition).append("\n");
145    } else {
146      sb.append("no replication ongoing, waiting for new log");
147    }
148    return sb.toString();
149  }
150
151  /**
152   * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
153   * false)
154   * @throws IOException
155   */
156  public void reset() throws IOException {
157    if (reader != null && currentPath != null) {
158      resetReader();
159    }
160  }
161
162  private void setPosition(long position) {
163    currentPosition = position;
164  }
165
166  private void setCurrentPath(Path path) {
167    this.currentPath = path;
168  }
169
170  private void tryAdvanceEntry() throws IOException {
171    if (checkReader()) {
172      boolean beingWritten = readNextEntryAndSetPosition();
173      if (currentEntry == null && !beingWritten) {
174        // no more entries in this log file, and the file is already closed, i.e, rolled
175        // Before dequeueing, we should always get one more attempt at reading.
176        // This is in case more entries came in after we opened the reader, and the log is rolled
177        // while we were reading. See HBASE-6758
178        resetReader();
179        readNextEntryAndSetPosition();
180        if (currentEntry == null) {
181          if (checkAllBytesParsed()) { // now we're certain we're done with this log file
182            dequeueCurrentLog();
183            if (openNextLog()) {
184              readNextEntryAndSetPosition();
185            }
186          }
187        }
188      }
189      // if currentEntry != null then just return
190      // if currentEntry == null but the file is still being written, then we should not switch to
191      // the next log either, just return here and try next time to see if there are more entries in
192      // the current file
193    }
194    // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
195  }
196
197  // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
198  private boolean checkAllBytesParsed() throws IOException {
199    // -1 means the wal wasn't closed cleanly.
200    final long trailerSize = currentTrailerSize();
201    FileStatus stat = null;
202    try {
203      stat = fs.getFileStatus(this.currentPath);
204    } catch (IOException exception) {
205      LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it "
206          + (trailerSize < 0 ? "was not" : "was") + " closed cleanly " + getCurrentPathStat());
207      metrics.incrUnknownFileLengthForClosedWAL();
208    }
209    if (stat != null) {
210      if (trailerSize < 0) {
211        if (currentPosition < stat.getLen()) {
212          final long skippedBytes = stat.getLen() - currentPosition;
213          if (LOG.isDebugEnabled()) {
214            LOG.debug("Reached the end of WAL file '" + currentPath
215                + "'. It was not closed cleanly, so we did not parse " + skippedBytes
216                + " bytes of data. This is normally ok.");
217          }
218          metrics.incrUncleanlyClosedWALs();
219          metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
220        }
221      } else if (currentPosition + trailerSize < stat.getLen()) {
222        LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition
223            + ", which is too far away from reported file length " + stat.getLen()
224            + ". Restarting WAL reading (see HBASE-15983 for details). " + getCurrentPathStat());
225        setPosition(0);
226        resetReader();
227        metrics.incrRestartedWALReading();
228        metrics.incrRepeatedFileBytes(currentPosition);
229        return false;
230      }
231    }
232    if (LOG.isTraceEnabled()) {
233      LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is "
234          + (stat == null ? "N/A" : stat.getLen()));
235    }
236    metrics.incrCompletedWAL();
237    return true;
238  }
239
240  private void dequeueCurrentLog() throws IOException {
241    if (LOG.isDebugEnabled()) {
242      LOG.debug("Reached the end of log " + currentPath);
243    }
244    closeReader();
245    logQueue.remove();
246    setPosition(0);
247    metrics.decrSizeOfLogQueue();
248  }
249
250  /**
251   * Returns whether the file is opened for writing.
252   */
253  private boolean readNextEntryAndSetPosition() throws IOException {
254    Entry readEntry = reader.next();
255    long readerPos = reader.getPosition();
256    OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
257    if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
258      // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
259      // data, so we need to make sure that we do not read beyond the committed file length.
260      if (LOG.isDebugEnabled()) {
261        LOG.debug("The provider tells us the valid length for " + currentPath + " is " +
262            fileLength.getAsLong() + ", but we have advanced to " + readerPos);
263      }
264      resetReader();
265      return true;
266    }
267    if (readEntry != null) {
268      metrics.incrLogEditsRead();
269      metrics.incrLogReadInBytes(readerPos - currentPosition);
270    }
271    currentEntry = readEntry; // could be null
272    setPosition(readerPos);
273    return fileLength.isPresent();
274  }
275
276  private void closeReader() throws IOException {
277    if (reader != null) {
278      reader.close();
279      reader = null;
280    }
281  }
282
283  // if we don't have a reader, open a reader on the next log
284  private boolean checkReader() throws IOException {
285    if (reader == null) {
286      return openNextLog();
287    }
288    return true;
289  }
290
291  // open a reader on the next log in queue
292  private boolean openNextLog() throws IOException {
293    Path nextPath = logQueue.peek();
294    if (nextPath != null) {
295      openReader(nextPath);
296      if (reader != null) {
297        return true;
298      }
299    }
300    return false;
301  }
302
303  private Path getArchivedLog(Path path) throws IOException {
304    Path walRootDir = CommonFSUtils.getWALRootDir(conf);
305
306    // Try found the log in old dir
307    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
308    Path archivedLogLocation = new Path(oldLogDir, path.getName());
309    if (fs.exists(archivedLogLocation)) {
310      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
311      return archivedLogLocation;
312    }
313
314    // Try found the log in the seperate old log dir
315    oldLogDir =
316        new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
317            .append(Path.SEPARATOR).append(serverName.getServerName()).toString());
318    archivedLogLocation = new Path(oldLogDir, path.getName());
319    if (fs.exists(archivedLogLocation)) {
320      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
321      return archivedLogLocation;
322    }
323
324    LOG.error("Couldn't locate log: " + path);
325    return path;
326  }
327
328  private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
329    // If the log was archived, continue reading from there
330    Path archivedLog = getArchivedLog(path);
331    if (!path.equals(archivedLog)) {
332      openReader(archivedLog);
333    } else {
334      throw fnfe;
335    }
336  }
337
338  private void openReader(Path path) throws IOException {
339    try {
340      // Detect if this is a new file, if so get a new reader else
341      // reset the current reader so that we see the new data
342      if (reader == null || !getCurrentPath().equals(path)) {
343        closeReader();
344        reader = WALFactory.createReader(fs, path, conf);
345        seek();
346        setCurrentPath(path);
347      } else {
348        resetReader();
349      }
350    } catch (FileNotFoundException fnfe) {
351      handleFileNotFound(path, fnfe);
352    }  catch (RemoteException re) {
353      IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
354      if (!(ioe instanceof FileNotFoundException)) throw ioe;
355      handleFileNotFound(path, (FileNotFoundException)ioe);
356    } catch (LeaseNotRecoveredException lnre) {
357      // HBASE-15019 the WAL was not closed due to some hiccup.
358      LOG.warn("Try to recover the WAL lease " + currentPath, lnre);
359      recoverLease(conf, currentPath);
360      reader = null;
361    } catch (NullPointerException npe) {
362      // Workaround for race condition in HDFS-4380
363      // which throws a NPE if we open a file before any data node has the most recent block
364      // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
365      LOG.warn("Got NPE opening reader, will retry.");
366      reader = null;
367    }
368  }
369
370  // For HBASE-15019
371  private void recoverLease(final Configuration conf, final Path path) {
372    try {
373
374      final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf);
375      FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
376      fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
377        @Override
378        public boolean progress() {
379          LOG.debug("recover WAL lease: " + path);
380          return true;
381        }
382      });
383    } catch (IOException e) {
384      LOG.warn("unable to recover lease for WAL: " + path, e);
385    }
386  }
387
388  private void resetReader() throws IOException {
389    try {
390      reader.reset();
391      seek();
392    } catch (FileNotFoundException fnfe) {
393      // If the log was archived, continue reading from there
394      Path archivedLog = getArchivedLog(currentPath);
395      if (!currentPath.equals(archivedLog)) {
396        openReader(archivedLog);
397      } else {
398        throw fnfe;
399      }
400    } catch (NullPointerException npe) {
401      throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
402    }
403  }
404
405  private void seek() throws IOException {
406    if (currentPosition != 0) {
407      reader.seek(currentPosition);
408    }
409  }
410
411  private long currentTrailerSize() {
412    long size = -1L;
413    if (reader instanceof ProtobufLogReader) {
414      final ProtobufLogReader pblr = (ProtobufLogReader) reader;
415      size = pblr.trailerSize();
416    }
417    return size;
418  }
419}