001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.EOFException;
021import java.io.IOException;
022import java.io.InputStream;
023import org.apache.hadoop.fs.FSDataInputStream;
024import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
025import org.apache.hadoop.hbase.wal.WAL.Entry;
026import org.apache.hadoop.hbase.wal.WALStreamReader;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
032
033import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
034import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
035
036/**
037 * A one way stream reader for reading protobuf based WAL file.
038 */
039@InterfaceAudience.Private
040public class ProtobufWALStreamReader extends AbstractProtobufWALReader
041  implements WALStreamReader, AbstractFSWALProvider.Initializer {
042
043  private static final Logger LOG = LoggerFactory.getLogger(ProtobufWALStreamReader.class);
044
045  @Override
046  public Entry next(Entry reuse) throws IOException {
047    long originalPosition = getPosition();
048    if (reachWALEditsStopOffset(originalPosition)) {
049      return null;
050    }
051    WALProtos.WALKey walKey;
052    try {
053      // for one way stream reader, we do not care about what is the exact position where we hit the
054      // EOF or IOE, so just use the helper method to parse WALKey, in tailing reader, we will try
055      // to read the varint size by ourselves
056      walKey = ProtobufUtil.parseDelimitedFrom(inputStream, WALProtos.WALKey.parser());
057    } catch (InvalidProtocolBufferException e) {
058      if (ProtobufUtil.isEOF(e) || isWALTrailer(originalPosition)) {
059        // InvalidProtocolBufferException.truncatedMessage, should throw EOF
060        // or we have started to read the partial WALTrailer
061        throw (EOFException) new EOFException("EOF while reading WALKey, originalPosition="
062          + originalPosition + ", currentPosition=" + inputStream.getPos()).initCause(e);
063      } else {
064        // For all other type of IPBEs, it means the WAL key is broken, throw IOException out to let
065        // the upper layer know, unless we have already reached the partial WALTrailer
066        throw (IOException) new IOException("Error while reading WALKey, originalPosition="
067          + originalPosition + ", currentPosition=" + inputStream.getPos()).initCause(e);
068      }
069    }
070    Entry entry = reuse;
071    if (entry == null) {
072      entry = new Entry();
073    }
074    entry.getKey().readFieldsFromPb(walKey, byteStringUncompressor);
075    if (!walKey.hasFollowingKvCount() || walKey.getFollowingKvCount() == 0) {
076      LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
077        inputStream.getPos());
078      return entry;
079    }
080    int expectedCells = walKey.getFollowingKvCount();
081    long posBefore = getPosition();
082    int actualCells;
083    try {
084      actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
085    } catch (Exception e) {
086      String message = " while reading " + expectedCells + " WAL KVs; started reading at "
087        + posBefore + " and read up to " + getPositionQuietly();
088      IOException realEofEx = extractHiddenEof(e);
089      if (realEofEx != null) {
090        throw (EOFException) new EOFException("EOF " + message).initCause(realEofEx);
091      } else {
092        // do not throw EOFException as it could be other type of errors, throwing EOF will cause
093        // the upper layer to consider the file has been fully read and cause data loss.
094        throw new IOException("Error " + message, e);
095      }
096    }
097    if (expectedCells != actualCells) {
098      throw new EOFException("Only read " + actualCells + " cells, expected " + expectedCells
099        + "; started reading at " + posBefore + " and read up to " + getPositionQuietly());
100    }
101    long posAfter = this.inputStream.getPos();
102    if (trailerPresent && posAfter > this.walEditsStopOffset) {
103      LOG.error("Read WALTrailer while reading WALEdits. wal: {}, inputStream.getPos(): {},"
104        + " walEditsStopOffset: {}", path, posAfter, walEditsStopOffset);
105      throw new EOFException("Read WALTrailer while reading WALEdits; started reading at "
106        + posBefore + " and read up to " + posAfter);
107    }
108    return entry;
109  }
110
111  @Override
112  protected InputStream getCellCodecInputStream(FSDataInputStream stream) {
113    // just return the original input stream
114    return stream;
115  }
116
117  @Override
118  protected void skipTo(long position) throws IOException {
119    Entry entry = new Entry();
120    for (;;) {
121      entry = next(entry);
122      if (entry == null) {
123        throw new EOFException("Can not skip to the given position " + position
124          + " as we have already reached the end of file");
125      }
126      long pos = inputStream.getPos();
127      if (pos > position) {
128        throw new IOException("Can not skip to the given position " + position + ", stopped at "
129          + pos + " which is already beyond the give position, malformed WAL?");
130      }
131      if (pos == position) {
132        return;
133      }
134    }
135  }
136}