001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.regionserver.wal;
021
022import java.io.EOFException;
023import java.io.IOException;
024import java.nio.ByteBuffer;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.List;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.FSDataInputStream;
033import org.apache.hadoop.hbase.codec.Codec;
034import org.apache.hadoop.hbase.HBaseInterfaceAudience;
035import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader.Builder;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.wal.WAL.Entry;
042import org.apache.yetus.audience.InterfaceAudience;
043
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
048import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
049import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
050
051/**
052 * A Protobuf based WAL has the following structure:
053 * <p>
054 * &lt;PB_WAL_MAGIC&gt;&lt;WALHeader&gt;&lt;WALEdits&gt;...&lt;WALEdits&gt;&lt;Trailer&gt;
055 * &lt;TrailerSize&gt; &lt;PB_WAL_COMPLETE_MAGIC&gt;
056 * </p>
057 * The Reader reads meta information (WAL Compression state, WALTrailer, etc) in
058 * ProtobufLogReader#initReader(FSDataInputStream). A WALTrailer is an extensible structure
059 * which is appended at the end of the WAL. This is empty for now; it can contain some meta
060 * information such as Region level stats, etc in future.
061 */
062@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX,
063  HBaseInterfaceAudience.CONFIG})
064public class ProtobufLogReader extends ReaderBase {
065  private static final Logger LOG = LoggerFactory.getLogger(ProtobufLogReader.class);
066  // public for WALFactory until we move everything to o.a.h.h.wal
067  @InterfaceAudience.Private
068  public static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
069  // public for TestWALSplit
070  @InterfaceAudience.Private
071  public static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
072  /**
073   * Configuration name of WAL Trailer's warning size. If a waltrailer's size is greater than the
074   * configured size, providers should log a warning. e.g. this is used with Protobuf reader/writer.
075   */
076  static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
077  static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB
078
079  protected FSDataInputStream inputStream;
080  protected Codec.Decoder cellDecoder;
081  protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
082  protected boolean hasCompression = false;
083  protected boolean hasTagCompression = false;
084  // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit
085  // entry in the wal, the inputstream's position is equal to walEditsStopOffset.
086  private long walEditsStopOffset;
087  private boolean trailerPresent;
088  protected WALTrailer trailer;
089  // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
090  // than this size, it is written/read respectively, with a WARN message in the log.
091  protected int trailerWarnSize;
092  private static List<String> writerClsNames = new ArrayList<>();
093  static {
094    writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
095    writerClsNames.add(AsyncProtobufLogWriter.class.getSimpleName());
096  }
097  
098  // cell codec classname
099  private String codecClsName = null;
100
101  @InterfaceAudience.Private
102  public long trailerSize() {
103    if (trailerPresent) {
104      // sizeof PB_WAL_COMPLETE_MAGIC + sizof trailerSize + trailer
105      final long calculatedSize = (long) PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT
106          + trailer.getSerializedSize();
107      final long expectedSize = fileLength - walEditsStopOffset;
108      if (expectedSize != calculatedSize) {
109        LOG.warn("After parsing the trailer, we expect the total footer to be {} bytes, but we "
110            + "calculate it as being {}", expectedSize, calculatedSize);
111      }
112      return expectedSize;
113    } else {
114      return -1L;
115    }
116  }
117
118  enum WALHdrResult {
119    EOF,                   // stream is at EOF when method starts
120    SUCCESS,
121    UNKNOWN_WRITER_CLS     // name of writer class isn't recognized
122  }
123  
124  // context for WALHdr carrying information such as Cell Codec classname
125  static class WALHdrContext {
126    WALHdrResult result;
127    String cellCodecClsName;
128    
129    WALHdrContext(WALHdrResult result, String cellCodecClsName) {
130      this.result = result;
131      this.cellCodecClsName = cellCodecClsName;
132    }
133    WALHdrResult getResult() {
134      return result;
135    }
136    String getCellCodecClsName() {
137      return cellCodecClsName;
138    }
139  }
140
141  public ProtobufLogReader() {
142    super();
143  }
144
145  @Override
146  public void close() throws IOException {
147    if (this.inputStream != null) {
148      this.inputStream.close();
149      this.inputStream = null;
150    }
151  }
152
153  @Override
154  public long getPosition() throws IOException {
155    return inputStream.getPos();
156  }
157
158  @Override
159  public void reset() throws IOException {
160    String clsName = initInternal(null, false);
161    initAfterCompression(clsName); // We need a new decoder (at least).
162  }
163
164  @Override
165  public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
166      throws IOException {
167    this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
168    super.init(fs, path, conf, stream);
169  }
170
171  @Override
172  protected String initReader(FSDataInputStream stream) throws IOException {
173    return initInternal(stream, true);
174  }
175
176  /*
177   * Returns names of the accepted writer classes
178   */
179  public List<String> getWriterClsNames() {
180    return writerClsNames;
181  }
182  
183  /*
184   * Returns the cell codec classname
185   */
186  public String getCodecClsName() {
187      return codecClsName;
188  }
189
190  protected WALHdrContext readHeader(Builder builder, FSDataInputStream stream)
191      throws IOException {
192     boolean res = builder.mergeDelimitedFrom(stream);
193     if (!res) return new WALHdrContext(WALHdrResult.EOF, null);
194     if (builder.hasWriterClsName() &&
195         !getWriterClsNames().contains(builder.getWriterClsName())) {
196       return new WALHdrContext(WALHdrResult.UNKNOWN_WRITER_CLS, null);
197     }
198     String clsName = null;
199     if (builder.hasCellCodecClsName()) {
200       clsName = builder.getCellCodecClsName();
201     }
202     return new WALHdrContext(WALHdrResult.SUCCESS, clsName);
203  }
204
205  private String initInternal(FSDataInputStream stream, boolean isFirst)
206      throws IOException {
207    close();
208    long expectedPos = PB_WAL_MAGIC.length;
209    if (stream == null) {
210      stream = fs.open(path);
211      stream.seek(expectedPos);
212    }
213    if (stream.getPos() != expectedPos) {
214      throw new IOException("The stream is at invalid position: " + stream.getPos());
215    }
216    // Initialize metadata or, when we reset, just skip the header.
217    WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
218    WALHdrContext hdrCtxt = readHeader(builder, stream);
219    WALHdrResult walHdrRes = hdrCtxt.getResult();
220    if (walHdrRes == WALHdrResult.EOF) {
221      throw new EOFException("Couldn't read WAL PB header");
222    }
223    if (walHdrRes == WALHdrResult.UNKNOWN_WRITER_CLS) {
224      throw new IOException("Got unknown writer class: " + builder.getWriterClsName());
225    }
226    if (isFirst) {
227      WALProtos.WALHeader header = builder.build();
228      this.hasCompression = header.hasHasCompression() && header.getHasCompression();
229      this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
230    }
231    this.inputStream = stream;
232    this.walEditsStopOffset = this.fileLength;
233    long currentPosition = stream.getPos();
234    trailerPresent = setTrailerIfPresent();
235    this.seekOnFs(currentPosition);
236    if (LOG.isTraceEnabled()) {
237      LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
238          + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + (trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false") + ", currentPosition: " + currentPosition);
239    }
240    
241    codecClsName = hdrCtxt.getCellCodecClsName();
242    
243    return hdrCtxt.getCellCodecClsName();
244  }
245
246  /**
247   * To check whether a trailer is present in a WAL, it seeks to position (fileLength -
248   * PB_WAL_COMPLETE_MAGIC.size() - Bytes.SIZEOF_INT). It reads the int value to know the size of
249   * the trailer, and checks whether the trailer is present at the end or not by comparing the last
250   * PB_WAL_COMPLETE_MAGIC.size() bytes. In case trailer is not present, it returns false;
251   * otherwise, sets the trailer and sets this.walEditsStopOffset variable up to the point just
252   * before the trailer.
253   * <ul>
254   * The trailer is ignored in case:
255   * <li>fileLength is 0 or not correct (when file is under recovery, etc).
256   * <li>the trailer size is negative.
257   * </ul>
258   * <p>
259   * In case the trailer size > this.trailerMaxSize, it is read after a WARN message.
260   * @return true if a valid trailer is present
261   * @throws IOException
262   */
263  private boolean setTrailerIfPresent() {
264    try {
265      long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT);
266      if (trailerSizeOffset <= 0) return false;// no trailer possible.
267      this.seekOnFs(trailerSizeOffset);
268      // read the int as trailer size.
269      int trailerSize = this.inputStream.readInt();
270      ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length);
271      this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
272      if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
273        LOG.trace("No trailer found.");
274        return false;
275      }
276      if (trailerSize < 0) {
277        LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer");
278        return false;
279      } else if (trailerSize > this.trailerWarnSize) {
280        // continue reading after warning the user.
281        LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : "
282          + trailerSize + " > " + this.trailerWarnSize);
283      }
284      // seek to the position where trailer starts.
285      long positionOfTrailer = trailerSizeOffset - trailerSize;
286      this.seekOnFs(positionOfTrailer);
287      // read the trailer.
288      buf = ByteBuffer.allocate(trailerSize);// for trailer.
289      this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
290      trailer = WALTrailer.parseFrom(buf.array());
291      this.walEditsStopOffset = positionOfTrailer;
292      return true;
293    } catch (IOException ioe) {
294      LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe);
295    }
296    return false;
297  }
298
299  protected WALCellCodec getCodec(Configuration conf, String cellCodecClsName,
300      CompressionContext compressionContext) throws IOException {
301    return WALCellCodec.create(conf, cellCodecClsName, compressionContext);
302  }
303
304  @Override
305  protected void initAfterCompression() throws IOException {
306    initAfterCompression(null);
307  }
308  
309  @Override
310  protected void initAfterCompression(String cellCodecClsName) throws IOException {
311    WALCellCodec codec = getCodec(this.conf, cellCodecClsName, this.compressionContext);
312    this.cellDecoder = codec.getDecoder(this.inputStream);
313    if (this.hasCompression) {
314      this.byteStringUncompressor = codec.getByteStringUncompressor();
315    } else {
316      this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();
317    }
318  }
319
320  @Override
321  protected boolean hasCompression() {
322    return this.hasCompression;
323  }
324
325  @Override
326  protected boolean hasTagCompression() {
327    return this.hasTagCompression;
328  }
329
330  @Override
331  protected boolean readNext(Entry entry) throws IOException {
332    while (true) {
333      // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
334      long originalPosition = this.inputStream.getPos();
335      if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
336        if (LOG.isTraceEnabled()) {
337          LOG.trace("Reached end of expected edits area at offset " + originalPosition);
338        }
339        return false;
340      }
341      WALKey.Builder builder = WALKey.newBuilder();
342      long size = 0;
343      boolean resetPosition = false;
344      try {
345        long available = -1;
346        try {
347          int firstByte = this.inputStream.read();
348          if (firstByte == -1) {
349            throw new EOFException("First byte is negative at offset " + originalPosition);
350          }
351          size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
352          // available may be < 0 on local fs for instance.  If so, can't depend on it.
353          available = this.inputStream.available();
354          if (available > 0 && available < size) {
355            throw new EOFException("Available stream not enough for edit, " +
356                "inputStream.available()= " + this.inputStream.available() + ", " +
357                "entry size= " + size + " at offset = " + this.inputStream.getPos());
358          }
359          ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size),
360            (int)size);
361        } catch (InvalidProtocolBufferException ipbe) {
362          resetPosition = true;
363          throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
364            originalPosition + ", currentPosition=" + this.inputStream.getPos() +
365            ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);
366        }
367        if (!builder.isInitialized()) {
368          // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
369          //       If we can get the KV count, we could, theoretically, try to get next record.
370          throw new EOFException("Partial PB while reading WAL, " +
371              "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());
372        }
373        WALKey walKey = builder.build();
374        entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
375        if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
376          if (LOG.isTraceEnabled()) {
377            LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" +
378                this.inputStream.getPos());
379          }
380          seekOnFs(originalPosition);
381          return false;
382        }
383        int expectedCells = walKey.getFollowingKvCount();
384        long posBefore = this.inputStream.getPos();
385        try {
386          int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
387          if (expectedCells != actualCells) {
388            resetPosition = true;
389            throw new EOFException("Only read " + actualCells); // other info added in catch
390          }
391        } catch (Exception ex) {
392          String posAfterStr = "<unknown>";
393          try {
394            posAfterStr = this.inputStream.getPos() + "";
395          } catch (Throwable t) {
396            if (LOG.isTraceEnabled()) {
397              LOG.trace("Error getting pos for error message - ignoring", t);
398            }
399          }
400          String message = " while reading " + expectedCells + " WAL KVs; started reading at "
401              + posBefore + " and read up to " + posAfterStr;
402          IOException realEofEx = extractHiddenEof(ex);
403          throw (EOFException) new EOFException("EOF " + message).
404              initCause(realEofEx != null ? realEofEx : ex);
405        }
406        if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
407          LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path
408              + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
409              + this.walEditsStopOffset);
410          throw new EOFException("Read WALTrailer while reading WALEdits");
411        }
412      } catch (EOFException eof) {
413        // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
414        if (originalPosition < 0) {
415          if (LOG.isTraceEnabled()) {
416            LOG.trace("Encountered a malformed edit, but can't seek back to last good position "
417                + "because originalPosition is negative. last offset="
418                + this.inputStream.getPos(), eof);
419          }
420          throw eof;
421        }
422        // If stuck at the same place and we got and exception, lets go back at the beginning.
423        if (inputStream.getPos() == originalPosition && resetPosition) {
424          if (LOG.isTraceEnabled()) {
425            LOG.trace("Encountered a malformed edit, seeking to the beginning of the WAL since "
426                + "current position and original position match at " + originalPosition);
427          }
428          seekOnFs(0);
429        } else {
430          // Else restore our position to original location in hope that next time through we will
431          // read successfully.
432          if (LOG.isTraceEnabled()) {
433            LOG.trace("Encountered a malformed edit, seeking back to last good position in file, "
434                + "from " + inputStream.getPos()+" to " + originalPosition, eof);
435          }
436          seekOnFs(originalPosition);
437        }
438        return false;
439      }
440      return true;
441    }
442  }
443
444  private IOException extractHiddenEof(Exception ex) {
445    // There are two problems we are dealing with here. Hadoop stream throws generic exception
446    // for EOF, not EOFException; and scanner further hides it inside RuntimeException.
447    IOException ioEx = null;
448    if (ex instanceof EOFException) {
449      return (EOFException)ex;
450    } else if (ex instanceof IOException) {
451      ioEx = (IOException)ex;
452    } else if (ex instanceof RuntimeException
453        && ex.getCause() != null && ex.getCause() instanceof IOException) {
454      ioEx = (IOException)ex.getCause();
455    }
456    if (ioEx != null) {
457      if (ioEx.getMessage().contains("EOF")) return ioEx;
458      return null;
459    }
460    return null;
461  }
462
463  @Override
464  protected void seekOnFs(long pos) throws IOException {
465    this.inputStream.seek(pos);
466  }
467}