001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.EOFException; 021import java.io.IOException; 022import java.io.InputStream; 023import org.apache.hadoop.fs.FSDataInputStream; 024import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 025import org.apache.hadoop.hbase.wal.WAL.Entry; 026import org.apache.hadoop.hbase.wal.WALStreamReader; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 032 033import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 035 036/** 037 * A one way stream reader for reading protobuf based WAL file. 038 */ 039@InterfaceAudience.Private 040public class ProtobufWALStreamReader extends AbstractProtobufWALReader 041 implements WALStreamReader, AbstractFSWALProvider.Initializer { 042 043 private static final Logger LOG = LoggerFactory.getLogger(ProtobufWALStreamReader.class); 044 045 @Override 046 public Entry next(Entry reuse) throws IOException { 047 long originalPosition = getPosition(); 048 if (reachWALEditsStopOffset(originalPosition)) { 049 return null; 050 } 051 WALProtos.WALKey walKey; 052 try { 053 // for one way stream reader, we do not care about what is the exact position where we hit the 054 // EOF or IOE, so just use the helper method to parse WALKey, in tailing reader, we will try 055 // to read the varint size by ourselves 056 walKey = ProtobufUtil.parseDelimitedFrom(inputStream, WALProtos.WALKey.parser()); 057 } catch (InvalidProtocolBufferException e) { 058 if (ProtobufUtil.isEOF(e) || isWALTrailer(originalPosition)) { 059 // InvalidProtocolBufferException.truncatedMessage, should throw EOF 060 // or we have started to read the partial WALTrailer 061 throw (EOFException) new EOFException("EOF while reading WALKey, originalPosition=" 062 + originalPosition + ", currentPosition=" + inputStream.getPos()).initCause(e); 063 } else { 064 // For all other type of IPBEs, it means the WAL key is broken, throw IOException out to let 065 // the upper layer know, unless we have already reached the partial WALTrailer 066 throw (IOException) new IOException("Error while reading WALKey, originalPosition=" 067 + originalPosition + ", currentPosition=" + inputStream.getPos()).initCause(e); 068 } 069 } 070 Entry entry = reuse; 071 if (entry == null) { 072 entry = new Entry(); 073 } 074 entry.getKey().readFieldsFromPb(walKey, byteStringUncompressor); 075 if (!walKey.hasFollowingKvCount() || walKey.getFollowingKvCount() == 0) { 076 LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}", 077 inputStream.getPos()); 078 return entry; 079 } 080 int expectedCells = walKey.getFollowingKvCount(); 081 long posBefore = getPosition(); 082 int actualCells; 083 try { 084 actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells); 085 } catch (Exception e) { 086 String message = " while reading " + expectedCells + " WAL KVs; started reading at " 087 + posBefore + " and read up to " + getPositionQuietly(); 088 IOException realEofEx = extractHiddenEof(e); 089 if (realEofEx != null) { 090 throw (EOFException) new EOFException("EOF " + message).initCause(realEofEx); 091 } else { 092 // do not throw EOFException as it could be other type of errors, throwing EOF will cause 093 // the upper layer to consider the file has been fully read and cause data loss. 094 throw new IOException("Error " + message, e); 095 } 096 } 097 if (expectedCells != actualCells) { 098 throw new EOFException("Only read " + actualCells + " cells, expected " + expectedCells 099 + "; started reading at " + posBefore + " and read up to " + getPositionQuietly()); 100 } 101 long posAfter = this.inputStream.getPos(); 102 if (trailerPresent && posAfter > this.walEditsStopOffset) { 103 LOG.error("Read WALTrailer while reading WALEdits. wal: {}, inputStream.getPos(): {}," 104 + " walEditsStopOffset: {}", path, posAfter, walEditsStopOffset); 105 throw new EOFException("Read WALTrailer while reading WALEdits; started reading at " 106 + posBefore + " and read up to " + posAfter); 107 } 108 return entry; 109 } 110 111 @Override 112 protected InputStream getCellCodecInputStream(FSDataInputStream stream) { 113 // just return the original input stream 114 return stream; 115 } 116 117 @Override 118 protected void skipTo(long position) throws IOException { 119 Entry entry = new Entry(); 120 for (;;) { 121 entry = next(entry); 122 if (entry == null) { 123 throw new EOFException("Can not skip to the given position " + position 124 + " as we have already reached the end of file"); 125 } 126 long pos = inputStream.getPos(); 127 if (pos > position) { 128 throw new IOException("Can not skip to the given position " + position + ", stopped at " 129 + pos + " which is already beyond the give position, malformed WAL?"); 130 } 131 if (pos == position) { 132 return; 133 } 134 } 135 } 136}