View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import java.io.EOFException;
23  import java.io.IOException;
24  import java.nio.ByteBuffer;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.List;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.FSDataInputStream;
36  import org.apache.hadoop.hbase.codec.Codec;
37  import org.apache.hadoop.hbase.io.LimitInputStream;
38  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
39  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
40  import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
41  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader.Builder;
42  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
43  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.wal.WAL.Entry;
46  
47  import com.google.protobuf.CodedInputStream;
48  import com.google.protobuf.InvalidProtocolBufferException;
49  
50  /**
51   * A Protobuf based WAL has the following structure:
52   * <p>
53   * &lt;PB_WAL_MAGIC&gt;&lt;WALHeader&gt;&lt;WALEdits&gt;...&lt;WALEdits&gt;&lt;Trailer&gt;
54   * &lt;TrailerSize&gt; &lt;PB_WAL_COMPLETE_MAGIC&gt;
55   * </p>
56   * The Reader reads meta information (WAL Compression state, WALTrailer, etc) in
57   * ProtobufLogReader#initReader(FSDataInputStream). A WALTrailer is an extensible structure
58   * which is appended at the end of the WAL. This is empty for now; it can contain some meta
59   * information such as Region level stats, etc in future.
60   */
61  @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX,
62    HBaseInterfaceAudience.CONFIG})
63  public class ProtobufLogReader extends ReaderBase {
64    private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class);
65    // public for WALFactory until we move everything to o.a.h.h.wal
66    @InterfaceAudience.Private
67    public static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
68    // public for TestWALSplit
69    @InterfaceAudience.Private
70    public static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
71    /**
72     * Configuration name of WAL Trailer's warning size. If a waltrailer's size is greater than the
73     * configured size, providers should log a warning. e.g. this is used with Protobuf reader/writer.
74     */
75    static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
76    static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB
77  
78    protected FSDataInputStream inputStream;
79    protected Codec.Decoder cellDecoder;
80    protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
81    protected boolean hasCompression = false;
82    protected boolean hasTagCompression = false;
83    // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit
84    // entry in the wal, the inputstream's position is equal to walEditsStopOffset.
85    private long walEditsStopOffset;
86    private boolean trailerPresent;
87    protected WALTrailer trailer;
88    // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
89    // than this size, it is written/read respectively, with a WARN message in the log.
90    protected int trailerWarnSize;
91    private static List<String> writerClsNames = new ArrayList<String>();
92    static {
93      writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
94    }
95    
96    // cell codec classname
97    private String codecClsName = null;
98  
99    @InterfaceAudience.Private
100   public long trailerSize() {
101     if (trailerPresent) {
102       // sizeof PB_WAL_COMPLETE_MAGIC + sizof trailerSize + trailer
103       final long calculatedSize = PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT + trailer.getSerializedSize();
104       final long expectedSize = fileLength - walEditsStopOffset;
105       if (expectedSize != calculatedSize) {
106         LOG.warn("After parsing the trailer, we expect the total footer to be "+ expectedSize +" bytes, but we calculate it as being " + calculatedSize);
107       }
108       return expectedSize;
109     } else {
110       return -1L;
111     }
112   }
113 
114   enum WALHdrResult {
115     EOF,                   // stream is at EOF when method starts
116     SUCCESS,
117     UNKNOWN_WRITER_CLS     // name of writer class isn't recognized
118   }
119   
120   // context for WALHdr carrying information such as Cell Codec classname
121   static class WALHdrContext {
122     WALHdrResult result;
123     String cellCodecClsName;
124     
125     WALHdrContext(WALHdrResult result, String cellCodecClsName) {
126       this.result = result;
127       this.cellCodecClsName = cellCodecClsName;
128     }
129     WALHdrResult getResult() {
130       return result;
131     }
132     String getCellCodecClsName() {
133       return cellCodecClsName;
134     }
135   }
136 
137   public ProtobufLogReader() {
138     super();
139   }
140 
141   @Override
142   public void close() throws IOException {
143     if (this.inputStream != null) {
144       this.inputStream.close();
145       this.inputStream = null;
146     }
147   }
148 
149   @Override
150   public long getPosition() throws IOException {
151     return inputStream.getPos();
152   }
153 
154   @Override
155   public void reset() throws IOException {
156     String clsName = initInternal(null, false);
157     initAfterCompression(clsName); // We need a new decoder (at least).
158   }
159 
160   @Override
161   public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
162       throws IOException {
163     this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
164     super.init(fs, path, conf, stream);
165   }
166 
167   @Override
168   protected String initReader(FSDataInputStream stream) throws IOException {
169     return initInternal(stream, true);
170   }
171 
172   /*
173    * Returns names of the accepted writer classes
174    */
175   public List<String> getWriterClsNames() {
176     return writerClsNames;
177   }
178   
179   /*
180    * Returns the cell codec classname
181    */
182   public String getCodecClsName() {
183       return codecClsName;
184   }
185 
186   protected WALHdrContext readHeader(Builder builder, FSDataInputStream stream)
187       throws IOException {
188      boolean res = builder.mergeDelimitedFrom(stream);
189      if (!res) return new WALHdrContext(WALHdrResult.EOF, null);
190      if (builder.hasWriterClsName() &&
191          !getWriterClsNames().contains(builder.getWriterClsName())) {
192        return new WALHdrContext(WALHdrResult.UNKNOWN_WRITER_CLS, null);
193      }
194      String clsName = null;
195      if (builder.hasCellCodecClsName()) {
196        clsName = builder.getCellCodecClsName();
197      }
198      return new WALHdrContext(WALHdrResult.SUCCESS, clsName);
199   }
200 
201   private String initInternal(FSDataInputStream stream, boolean isFirst)
202       throws IOException {
203     close();
204     long expectedPos = PB_WAL_MAGIC.length;
205     if (stream == null) {
206       stream = fs.open(path);
207       stream.seek(expectedPos);
208     }
209     if (stream.getPos() != expectedPos) {
210       throw new IOException("The stream is at invalid position: " + stream.getPos());
211     }
212     // Initialize metadata or, when we reset, just skip the header.
213     WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
214     WALHdrContext hdrCtxt = readHeader(builder, stream);
215     WALHdrResult walHdrRes = hdrCtxt.getResult();
216     if (walHdrRes == WALHdrResult.EOF) {
217       throw new EOFException("Couldn't read WAL PB header");
218     }
219     if (walHdrRes == WALHdrResult.UNKNOWN_WRITER_CLS) {
220       throw new IOException("Got unknown writer class: " + builder.getWriterClsName());
221     }
222     if (isFirst) {
223       WALProtos.WALHeader header = builder.build();
224       this.hasCompression = header.hasHasCompression() && header.getHasCompression();
225       this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
226     }
227     this.inputStream = stream;
228     this.walEditsStopOffset = this.fileLength;
229     long currentPosition = stream.getPos();
230     trailerPresent = setTrailerIfPresent();
231     this.seekOnFs(currentPosition);
232     if (LOG.isTraceEnabled()) {
233       LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
234           + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + (trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false") + ", currentPosition: " + currentPosition);
235     }
236     
237     codecClsName = hdrCtxt.getCellCodecClsName();
238     
239     return hdrCtxt.getCellCodecClsName();
240   }
241 
242   /**
243    * To check whether a trailer is present in a WAL, it seeks to position (fileLength -
244    * PB_WAL_COMPLETE_MAGIC.size() - Bytes.SIZEOF_INT). It reads the int value to know the size of
245    * the trailer, and checks whether the trailer is present at the end or not by comparing the last
246    * PB_WAL_COMPLETE_MAGIC.size() bytes. In case trailer is not present, it returns false;
247    * otherwise, sets the trailer and sets this.walEditsStopOffset variable up to the point just
248    * before the trailer.
249    * <ul>
250    * The trailer is ignored in case:
251    * <li>fileLength is 0 or not correct (when file is under recovery, etc).
252    * <li>the trailer size is negative.
253    * </ul>
254    * <p>
255    * In case the trailer size > this.trailerMaxSize, it is read after a WARN message.
256    * @return true if a valid trailer is present
257    * @throws IOException
258    */
259   private boolean setTrailerIfPresent() {
260     try {
261       long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT);
262       if (trailerSizeOffset <= 0) return false;// no trailer possible.
263       this.seekOnFs(trailerSizeOffset);
264       // read the int as trailer size.
265       int trailerSize = this.inputStream.readInt();
266       ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length);
267       this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
268       if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
269         LOG.trace("No trailer found.");
270         return false;
271       }
272       if (trailerSize < 0) {
273         LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer");
274         return false;
275       } else if (trailerSize > this.trailerWarnSize) {
276         // continue reading after warning the user.
277         LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : "
278           + trailerSize + " > " + this.trailerWarnSize);
279       }
280       // seek to the position where trailer starts.
281       long positionOfTrailer = trailerSizeOffset - trailerSize;
282       this.seekOnFs(positionOfTrailer);
283       // read the trailer.
284       buf = ByteBuffer.allocate(trailerSize);// for trailer.
285       this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
286       trailer = WALTrailer.parseFrom(buf.array());
287       this.walEditsStopOffset = positionOfTrailer;
288       return true;
289     } catch (IOException ioe) {
290       LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe);
291     }
292     return false;
293   }
294 
295   protected WALCellCodec getCodec(Configuration conf, String cellCodecClsName,
296       CompressionContext compressionContext) throws IOException {
297     return WALCellCodec.create(conf, cellCodecClsName, compressionContext);
298   }
299 
300   @Override
301   protected void initAfterCompression() throws IOException {
302     initAfterCompression(null);
303   }
304   
305   @Override
306   protected void initAfterCompression(String cellCodecClsName) throws IOException {
307     WALCellCodec codec = getCodec(this.conf, cellCodecClsName, this.compressionContext);
308     this.cellDecoder = codec.getDecoder(this.inputStream);
309     if (this.hasCompression) {
310       this.byteStringUncompressor = codec.getByteStringUncompressor();
311     }
312   }
313 
314   @Override
315   protected boolean hasCompression() {
316     return this.hasCompression;
317   }
318 
319   @Override
320   protected boolean hasTagCompression() {
321     return this.hasTagCompression;
322   }
323 
324   @Override
325   protected boolean readNext(Entry entry) throws IOException {
326     while (true) {
327       // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
328       long originalPosition = this.inputStream.getPos();
329       if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
330         if (LOG.isTraceEnabled()) {
331           LOG.trace("Reached end of expected edits area at offset " + originalPosition);
332         }
333         return false;
334       }
335       WALKey.Builder builder = WALKey.newBuilder();
336       long size = 0;
337       try {
338         long available = -1;
339         try {
340           int firstByte = this.inputStream.read();
341           if (firstByte == -1) {
342             throw new EOFException("First byte is negative at offset " + originalPosition);
343           }
344           size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
345           // available may be < 0 on local fs for instance.  If so, can't depend on it.
346           available = this.inputStream.available();
347           if (available > 0 && available < size) {
348             throw new EOFException("Available stream not enough for edit, " +
349                 "inputStream.available()= " + this.inputStream.available() + ", " +
350                 "entry size= " + size + " at offset = " + this.inputStream.getPos());
351           }
352           ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size),
353             (int)size);
354         } catch (InvalidProtocolBufferException ipbe) {
355           throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
356             originalPosition + ", currentPosition=" + this.inputStream.getPos() +
357             ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);
358         }
359         if (!builder.isInitialized()) {
360           // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
361           //       If we can get the KV count, we could, theoretically, try to get next record.
362           throw new EOFException("Partial PB while reading WAL, " +
363               "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());
364         }
365         WALKey walKey = builder.build();
366         entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
367         if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
368           if (LOG.isTraceEnabled()) {
369             LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" + this.inputStream.getPos());
370           }
371           continue;
372         }
373         int expectedCells = walKey.getFollowingKvCount();
374         long posBefore = this.inputStream.getPos();
375         try {
376           int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
377           if (expectedCells != actualCells) {
378             throw new EOFException("Only read " + actualCells); // other info added in catch
379           }
380         } catch (Exception ex) {
381           String posAfterStr = "<unknown>";
382           try {
383             posAfterStr = this.inputStream.getPos() + "";
384           } catch (Throwable t) {
385             if (LOG.isTraceEnabled()) {
386               LOG.trace("Error getting pos for error message - ignoring", t);
387             }
388           }
389           String message = " while reading " + expectedCells + " WAL KVs; started reading at "
390               + posBefore + " and read up to " + posAfterStr;
391           IOException realEofEx = extractHiddenEof(ex);
392           throw (EOFException) new EOFException("EOF " + message).
393               initCause(realEofEx != null ? realEofEx : ex);
394         }
395         if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
396           LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path
397               + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
398               + this.walEditsStopOffset);
399           throw new EOFException("Read WALTrailer while reading WALEdits");
400         }
401       } catch (EOFException eof) {
402         // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
403         if (originalPosition < 0) {
404           if (LOG.isTraceEnabled()) {
405             LOG.trace("Encountered a malformed edit, but can't seek back to last good position because originalPosition is negative. last offset=" + this.inputStream.getPos(), eof);
406           }
407           throw eof;
408         }
409         // Else restore our position to original location in hope that next time through we will
410         // read successfully.
411         if (LOG.isTraceEnabled()) {
412           LOG.trace("Encountered a malformed edit, seeking back to last good position in file, from "+ inputStream.getPos()+" to " + originalPosition, eof);
413         }
414         seekOnFs(originalPosition);
415         return false;
416       }
417       return true;
418     }
419   }
420 
421   private IOException extractHiddenEof(Exception ex) {
422     // There are two problems we are dealing with here. Hadoop stream throws generic exception
423     // for EOF, not EOFException; and scanner further hides it inside RuntimeException.
424     IOException ioEx = null;
425     if (ex instanceof EOFException) {
426       return (EOFException)ex;
427     } else if (ex instanceof IOException) {
428       ioEx = (IOException)ex;
429     } else if (ex instanceof RuntimeException
430         && ex.getCause() != null && ex.getCause() instanceof IOException) {
431       ioEx = (IOException)ex.getCause();
432     }
433     if (ioEx != null) {
434       if (ioEx.getMessage().contains("EOF")) return ioEx;
435       return null;
436     }
437     return null;
438   }
439 
440   @Override
441   protected void seekOnFs(long pos) throws IOException {
442     this.inputStream.seek(pos);
443   }
444 }