001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.EOFException;
021import java.io.IOException;
022import java.nio.ByteBuffer;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.List;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FSDataInputStream;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseInterfaceAudience;
031import org.apache.hadoop.hbase.codec.Codec;
032import org.apache.hadoop.hbase.io.compress.Compression;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.wal.WAL.Entry;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
040import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
041import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
042
043import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader.Builder;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
048
049/**
050 * A Protobuf based WAL has the following structure:
051 * <p>
052 * &lt;PB_WAL_MAGIC&gt;&lt;WALHeader&gt;&lt;WALEdits&gt;...&lt;WALEdits&gt;&lt;Trailer&gt;
053 * &lt;TrailerSize&gt; &lt;PB_WAL_COMPLETE_MAGIC&gt;
054 * </p>
055 * The Reader reads meta information (WAL Compression state, WALTrailer, etc) in
056 * ProtobufLogReader#initReader(FSDataInputStream). A WALTrailer is an extensible structure which is
057 * appended at the end of the WAL. This is empty for now; it can contain some meta information such
058 * as Region level stats, etc in future.
059 */
060@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX,
061  HBaseInterfaceAudience.CONFIG })
062public class ProtobufLogReader extends ReaderBase {
063  private static final Logger LOG = LoggerFactory.getLogger(ProtobufLogReader.class);
064  // public for WALFactory until we move everything to o.a.h.h.wal
065  @InterfaceAudience.Private
066  public static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
067  // public for TestWALSplit
068  @InterfaceAudience.Private
069  public static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
070  /**
071   * Configuration name of WAL Trailer's warning size. If a waltrailer's size is greater than the
072   * configured size, providers should log a warning. e.g. this is used with Protobuf reader/writer.
073   */
074  static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
075  static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB
076
077  protected FSDataInputStream inputStream;
078  protected Codec.Decoder cellDecoder;
079  protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
080  protected boolean hasCompression = false;
081  protected boolean hasTagCompression = false;
082  protected boolean hasValueCompression = false;
083  protected Compression.Algorithm valueCompressionType = null;
084  // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit
085  // entry in the wal, the inputstream's position is equal to walEditsStopOffset.
086  private long walEditsStopOffset;
087  private boolean trailerPresent;
088  protected WALTrailer trailer;
089  // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
090  // than this size, it is written/read respectively, with a WARN message in the log.
091  protected int trailerWarnSize;
092  private static List<String> writerClsNames = new ArrayList<>();
093  static {
094    writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
095    writerClsNames.add(AsyncProtobufLogWriter.class.getSimpleName());
096  }
097
098  // cell codec classname
099  private String codecClsName = null;
100
101  @InterfaceAudience.Private
102  public long trailerSize() {
103    if (trailerPresent) {
104      // sizeof PB_WAL_COMPLETE_MAGIC + sizof trailerSize + trailer
105      final long calculatedSize =
106        (long) PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT + trailer.getSerializedSize();
107      final long expectedSize = fileLength - walEditsStopOffset;
108      if (expectedSize != calculatedSize) {
109        LOG.warn("After parsing the trailer, we expect the total footer to be {} bytes, but we "
110          + "calculate it as being {}", expectedSize, calculatedSize);
111      }
112      return expectedSize;
113    } else {
114      return -1L;
115    }
116  }
117
118  enum WALHdrResult {
119    EOF, // stream is at EOF when method starts
120    SUCCESS,
121    UNKNOWN_WRITER_CLS // name of writer class isn't recognized
122  }
123
124  // context for WALHdr carrying information such as Cell Codec classname
125  static class WALHdrContext {
126    WALHdrResult result;
127    String cellCodecClsName;
128
129    WALHdrContext(WALHdrResult result, String cellCodecClsName) {
130      this.result = result;
131      this.cellCodecClsName = cellCodecClsName;
132    }
133
134    WALHdrResult getResult() {
135      return result;
136    }
137
138    String getCellCodecClsName() {
139      return cellCodecClsName;
140    }
141  }
142
143  public ProtobufLogReader() {
144    super();
145  }
146
147  @Override
148  public void close() throws IOException {
149    if (this.inputStream != null) {
150      this.inputStream.close();
151      this.inputStream = null;
152    }
153  }
154
155  @Override
156  public long getPosition() throws IOException {
157    return inputStream.getPos();
158  }
159
160  @Override
161  public void reset() throws IOException {
162    String clsName = initInternal(null, false);
163    initAfterCompression(clsName); // We need a new decoder (at least).
164  }
165
166  @Override
167  public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
168    throws IOException {
169    this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
170    super.init(fs, path, conf, stream);
171  }
172
173  @Override
174  protected String initReader(FSDataInputStream stream) throws IOException {
175    return initInternal(stream, true);
176  }
177
178  /*
179   * Returns names of the accepted writer classes
180   */
181  public List<String> getWriterClsNames() {
182    return writerClsNames;
183  }
184
185  /*
186   * Returns the cell codec classname
187   */
188  public String getCodecClsName() {
189    return codecClsName;
190  }
191
192  protected WALHdrContext readHeader(Builder builder, FSDataInputStream stream) throws IOException {
193    boolean res = builder.mergeDelimitedFrom(stream);
194    if (!res) return new WALHdrContext(WALHdrResult.EOF, null);
195    if (builder.hasWriterClsName() && !getWriterClsNames().contains(builder.getWriterClsName())) {
196      return new WALHdrContext(WALHdrResult.UNKNOWN_WRITER_CLS, null);
197    }
198    String clsName = null;
199    if (builder.hasCellCodecClsName()) {
200      clsName = builder.getCellCodecClsName();
201    }
202    return new WALHdrContext(WALHdrResult.SUCCESS, clsName);
203  }
204
205  private String initInternal(FSDataInputStream stream, boolean isFirst) throws IOException {
206    close();
207    if (!isFirst) {
208      // Re-compute the file length.
209      this.fileLength = fs.getFileStatus(path).getLen();
210    }
211    long expectedPos = PB_WAL_MAGIC.length;
212    if (stream == null) {
213      stream = fs.open(path);
214      stream.seek(expectedPos);
215    }
216    if (stream.getPos() != expectedPos) {
217      throw new IOException("The stream is at invalid position: " + stream.getPos());
218    }
219    // Initialize metadata or, when we reset, just skip the header.
220    WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
221    WALHdrContext hdrCtxt = readHeader(builder, stream);
222    WALHdrResult walHdrRes = hdrCtxt.getResult();
223    if (walHdrRes == WALHdrResult.EOF) {
224      throw new EOFException("Couldn't read WAL PB header");
225    }
226    if (walHdrRes == WALHdrResult.UNKNOWN_WRITER_CLS) {
227      throw new IOException("Got unknown writer class: " + builder.getWriterClsName());
228    }
229    if (isFirst) {
230      WALProtos.WALHeader header = builder.build();
231      this.hasCompression = header.hasHasCompression() && header.getHasCompression();
232      this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
233      this.hasValueCompression = header.hasHasValueCompression() && header.getHasValueCompression();
234      if (header.hasValueCompressionAlgorithm()) {
235        try {
236          this.valueCompressionType =
237            Compression.Algorithm.values()[header.getValueCompressionAlgorithm()];
238        } catch (ArrayIndexOutOfBoundsException e) {
239          throw new IOException("Invalid compression type", e);
240        }
241      }
242    }
243    this.inputStream = stream;
244    this.walEditsStopOffset = this.fileLength;
245    long currentPosition = stream.getPos();
246    trailerPresent = setTrailerIfPresent();
247    this.seekOnFs(currentPosition);
248    if (LOG.isTraceEnabled()) {
249      LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
250        + ", fileLength: " + this.fileLength + ", " + "trailerPresent: "
251        + (trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false")
252        + ", currentPosition: " + currentPosition);
253    }
254
255    codecClsName = hdrCtxt.getCellCodecClsName();
256
257    return hdrCtxt.getCellCodecClsName();
258  }
259
260  /**
261   * To check whether a trailer is present in a WAL, it seeks to position (fileLength -
262   * PB_WAL_COMPLETE_MAGIC.size() - Bytes.SIZEOF_INT). It reads the int value to know the size of
263   * the trailer, and checks whether the trailer is present at the end or not by comparing the last
264   * PB_WAL_COMPLETE_MAGIC.size() bytes. In case trailer is not present, it returns false;
265   * otherwise, sets the trailer and sets this.walEditsStopOffset variable up to the point just
266   * before the trailer.
267   * <ul>
268   * The trailer is ignored in case:
269   * <li>fileLength is 0 or not correct (when file is under recovery, etc).
270   * <li>the trailer size is negative.
271   * </ul>
272   * <p>
273   * In case the trailer size > this.trailerMaxSize, it is read after a WARN message.
274   * @return true if a valid trailer is present n
275   */
276  private boolean setTrailerIfPresent() {
277    try {
278      long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT);
279      if (trailerSizeOffset <= 0) return false;// no trailer possible.
280      this.seekOnFs(trailerSizeOffset);
281      // read the int as trailer size.
282      int trailerSize = this.inputStream.readInt();
283      ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length);
284      this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
285      if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
286        LOG.trace("No trailer found.");
287        return false;
288      }
289      if (trailerSize < 0) {
290        LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer");
291        return false;
292      } else if (trailerSize > this.trailerWarnSize) {
293        // continue reading after warning the user.
294        LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : "
295          + trailerSize + " > " + this.trailerWarnSize);
296      }
297      // seek to the position where trailer starts.
298      long positionOfTrailer = trailerSizeOffset - trailerSize;
299      this.seekOnFs(positionOfTrailer);
300      // read the trailer.
301      buf = ByteBuffer.allocate(trailerSize);// for trailer.
302      this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
303      trailer = WALTrailer.parseFrom(buf.array());
304      this.walEditsStopOffset = positionOfTrailer;
305      return true;
306    } catch (IOException ioe) {
307      LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe);
308    }
309    return false;
310  }
311
312  protected WALCellCodec getCodec(Configuration conf, String cellCodecClsName,
313    CompressionContext compressionContext) throws IOException {
314    return WALCellCodec.create(conf, cellCodecClsName, compressionContext);
315  }
316
317  @Override
318  protected void initAfterCompression() throws IOException {
319    initAfterCompression(null);
320  }
321
322  @Override
323  protected void initAfterCompression(String cellCodecClsName) throws IOException {
324    WALCellCodec codec = getCodec(this.conf, cellCodecClsName, this.compressionContext);
325    this.cellDecoder = codec.getDecoder(this.inputStream);
326    if (this.hasCompression) {
327      this.byteStringUncompressor = codec.getByteStringUncompressor();
328    } else {
329      this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();
330    }
331  }
332
333  @Override
334  protected boolean hasCompression() {
335    return this.hasCompression;
336  }
337
338  @Override
339  protected boolean hasTagCompression() {
340    return this.hasTagCompression;
341  }
342
343  @Override
344  protected boolean hasValueCompression() {
345    return this.hasValueCompression;
346  }
347
348  @Override
349  protected Compression.Algorithm getValueCompressionAlgorithm() {
350    return this.valueCompressionType;
351  }
352
353  @Override
354  protected boolean readNext(Entry entry) throws IOException {
355    // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
356    long originalPosition = this.inputStream.getPos();
357    if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
358      LOG.trace("Reached end of expected edits area at offset {}", originalPosition);
359      return false;
360    }
361    WALKey.Builder builder = WALKey.newBuilder();
362    long size = 0;
363    boolean resetPosition = false;
364    try {
365      long available = -1;
366      try {
367        int firstByte = this.inputStream.read();
368        if (firstByte == -1) {
369          throw new EOFException();
370        }
371        size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
372        // available may be < 0 on local fs for instance. If so, can't depend on it.
373        available = this.inputStream.available();
374        if (available > 0 && available < size) {
375          throw new EOFException("Available stream not enough for edit, "
376            + "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= "
377            + size + " at offset = " + this.inputStream.getPos());
378        }
379        ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size), (int) size);
380      } catch (InvalidProtocolBufferException ipbe) {
381        resetPosition = true;
382        throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
383          + originalPosition + ", currentPosition=" + this.inputStream.getPos() + ", messageSize="
384          + size + ", currentAvailable=" + available).initCause(ipbe);
385      }
386      if (!builder.isInitialized()) {
387        // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
388        // If we can get the KV count, we could, theoretically, try to get next record.
389        throw new EOFException("Partial PB while reading WAL, "
390          + "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());
391      }
392      WALKey walKey = builder.build();
393      entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
394      if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
395        LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
396          this.inputStream.getPos());
397        seekOnFs(originalPosition);
398        return false;
399      }
400      int expectedCells = walKey.getFollowingKvCount();
401      long posBefore = this.inputStream.getPos();
402      try {
403        int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
404        if (expectedCells != actualCells) {
405          resetPosition = true;
406          throw new EOFException("Only read " + actualCells); // other info added in catch
407        }
408      } catch (Exception ex) {
409        String posAfterStr = "<unknown>";
410        try {
411          posAfterStr = this.inputStream.getPos() + "";
412        } catch (Throwable t) {
413          LOG.trace("Error getting pos for error message - ignoring", t);
414        }
415        String message = " while reading " + expectedCells + " WAL KVs; started reading at "
416          + posBefore + " and read up to " + posAfterStr;
417        IOException realEofEx = extractHiddenEof(ex);
418        throw (EOFException) new EOFException("EOF " + message)
419          .initCause(realEofEx != null ? realEofEx : ex);
420      }
421      if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
422        LOG.error(
423          "Read WALTrailer while reading WALEdits. wal: " + this.path + ", inputStream.getPos(): "
424            + this.inputStream.getPos() + ", walEditsStopOffset: " + this.walEditsStopOffset);
425        throw new EOFException("Read WALTrailer while reading WALEdits");
426      }
427    } catch (EOFException eof) {
428      // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
429      if (originalPosition < 0) {
430        LOG.debug(
431          "Encountered a malformed edit, but can't seek back to last good position "
432            + "because originalPosition is negative. last offset={}",
433          this.inputStream.getPos(), eof);
434        throw eof;
435      }
436      // If stuck at the same place and we got an exception, lets go back at the beginning.
437      if (inputStream.getPos() == originalPosition) {
438        if (resetPosition) {
439          LOG.debug("Encountered a malformed edit, seeking to the beginning of the WAL since "
440            + "current position and original position match at {}", originalPosition);
441          seekOnFs(0);
442        } else {
443          LOG.debug("EOF at position {}", originalPosition);
444        }
445      } else {
446        // Else restore our position to original location in hope that next time through we will
447        // read successfully.
448        LOG.debug("Encountered a malformed edit, seeking back to last good position in file, "
449          + "from {} to {}", inputStream.getPos(), originalPosition, eof);
450        seekOnFs(originalPosition);
451      }
452      return false;
453    }
454    return true;
455  }
456
457  private IOException extractHiddenEof(Exception ex) {
458    // There are two problems we are dealing with here. Hadoop stream throws generic exception
459    // for EOF, not EOFException; and scanner further hides it inside RuntimeException.
460    IOException ioEx = null;
461    if (ex instanceof EOFException) {
462      return (EOFException) ex;
463    } else if (ex instanceof IOException) {
464      ioEx = (IOException) ex;
465    } else if (
466      ex instanceof RuntimeException && ex.getCause() != null
467        && ex.getCause() instanceof IOException
468    ) {
469      ioEx = (IOException) ex.getCause();
470    }
471    if ((ioEx != null) && (ioEx.getMessage() != null)) {
472      if (ioEx.getMessage().contains("EOF")) return ioEx;
473      return null;
474    }
475    return null;
476  }
477
478  @Override
479  protected void seekOnFs(long pos) throws IOException {
480    this.inputStream.seek(pos);
481  }
482}