001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.regionserver.wal; 021 022import java.io.EOFException; 023import java.io.IOException; 024import java.nio.ByteBuffer; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.List; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.FSDataInputStream; 033import org.apache.hadoop.hbase.codec.Codec; 034import org.apache.hadoop.hbase.HBaseInterfaceAudience; 035import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader.Builder; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.wal.WAL.Entry; 042import org.apache.yetus.audience.InterfaceAudience; 043 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; 048import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; 049import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 050 051/** 052 * A Protobuf based WAL has the following structure: 053 * <p> 054 * <PB_WAL_MAGIC><WALHeader><WALEdits>...<WALEdits><Trailer> 055 * <TrailerSize> <PB_WAL_COMPLETE_MAGIC> 056 * </p> 057 * The Reader reads meta information (WAL Compression state, WALTrailer, etc) in 058 * ProtobufLogReader#initReader(FSDataInputStream). A WALTrailer is an extensible structure 059 * which is appended at the end of the WAL. This is empty for now; it can contain some meta 060 * information such as Region level stats, etc in future. 061 */ 062@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, 063 HBaseInterfaceAudience.CONFIG}) 064public class ProtobufLogReader extends ReaderBase { 065 private static final Logger LOG = LoggerFactory.getLogger(ProtobufLogReader.class); 066 // public for WALFactory until we move everything to o.a.h.h.wal 067 @InterfaceAudience.Private 068 public static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL"); 069 // public for TestWALSplit 070 @InterfaceAudience.Private 071 public static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP"); 072 /** 073 * Configuration name of WAL Trailer's warning size. If a waltrailer's size is greater than the 074 * configured size, providers should log a warning. e.g. this is used with Protobuf reader/writer. 075 */ 076 static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size"; 077 static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB 078 079 protected FSDataInputStream inputStream; 080 protected Codec.Decoder cellDecoder; 081 protected WALCellCodec.ByteStringUncompressor byteStringUncompressor; 082 protected boolean hasCompression = false; 083 protected boolean hasTagCompression = false; 084 // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit 085 // entry in the wal, the inputstream's position is equal to walEditsStopOffset. 086 private long walEditsStopOffset; 087 private boolean trailerPresent; 088 protected WALTrailer trailer; 089 // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger 090 // than this size, it is written/read respectively, with a WARN message in the log. 091 protected int trailerWarnSize; 092 private static List<String> writerClsNames = new ArrayList<>(); 093 static { 094 writerClsNames.add(ProtobufLogWriter.class.getSimpleName()); 095 writerClsNames.add(AsyncProtobufLogWriter.class.getSimpleName()); 096 } 097 098 // cell codec classname 099 private String codecClsName = null; 100 101 @InterfaceAudience.Private 102 public long trailerSize() { 103 if (trailerPresent) { 104 // sizeof PB_WAL_COMPLETE_MAGIC + sizof trailerSize + trailer 105 final long calculatedSize = (long) PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT 106 + trailer.getSerializedSize(); 107 final long expectedSize = fileLength - walEditsStopOffset; 108 if (expectedSize != calculatedSize) { 109 LOG.warn("After parsing the trailer, we expect the total footer to be {} bytes, but we " 110 + "calculate it as being {}", expectedSize, calculatedSize); 111 } 112 return expectedSize; 113 } else { 114 return -1L; 115 } 116 } 117 118 enum WALHdrResult { 119 EOF, // stream is at EOF when method starts 120 SUCCESS, 121 UNKNOWN_WRITER_CLS // name of writer class isn't recognized 122 } 123 124 // context for WALHdr carrying information such as Cell Codec classname 125 static class WALHdrContext { 126 WALHdrResult result; 127 String cellCodecClsName; 128 129 WALHdrContext(WALHdrResult result, String cellCodecClsName) { 130 this.result = result; 131 this.cellCodecClsName = cellCodecClsName; 132 } 133 WALHdrResult getResult() { 134 return result; 135 } 136 String getCellCodecClsName() { 137 return cellCodecClsName; 138 } 139 } 140 141 public ProtobufLogReader() { 142 super(); 143 } 144 145 @Override 146 public void close() throws IOException { 147 if (this.inputStream != null) { 148 this.inputStream.close(); 149 this.inputStream = null; 150 } 151 } 152 153 @Override 154 public long getPosition() throws IOException { 155 return inputStream.getPos(); 156 } 157 158 @Override 159 public void reset() throws IOException { 160 String clsName = initInternal(null, false); 161 initAfterCompression(clsName); // We need a new decoder (at least). 162 } 163 164 @Override 165 public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream) 166 throws IOException { 167 this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); 168 super.init(fs, path, conf, stream); 169 } 170 171 @Override 172 protected String initReader(FSDataInputStream stream) throws IOException { 173 return initInternal(stream, true); 174 } 175 176 /* 177 * Returns names of the accepted writer classes 178 */ 179 public List<String> getWriterClsNames() { 180 return writerClsNames; 181 } 182 183 /* 184 * Returns the cell codec classname 185 */ 186 public String getCodecClsName() { 187 return codecClsName; 188 } 189 190 protected WALHdrContext readHeader(Builder builder, FSDataInputStream stream) 191 throws IOException { 192 boolean res = builder.mergeDelimitedFrom(stream); 193 if (!res) return new WALHdrContext(WALHdrResult.EOF, null); 194 if (builder.hasWriterClsName() && 195 !getWriterClsNames().contains(builder.getWriterClsName())) { 196 return new WALHdrContext(WALHdrResult.UNKNOWN_WRITER_CLS, null); 197 } 198 String clsName = null; 199 if (builder.hasCellCodecClsName()) { 200 clsName = builder.getCellCodecClsName(); 201 } 202 return new WALHdrContext(WALHdrResult.SUCCESS, clsName); 203 } 204 205 private String initInternal(FSDataInputStream stream, boolean isFirst) 206 throws IOException { 207 close(); 208 long expectedPos = PB_WAL_MAGIC.length; 209 if (stream == null) { 210 stream = fs.open(path); 211 stream.seek(expectedPos); 212 } 213 if (stream.getPos() != expectedPos) { 214 throw new IOException("The stream is at invalid position: " + stream.getPos()); 215 } 216 // Initialize metadata or, when we reset, just skip the header. 217 WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder(); 218 WALHdrContext hdrCtxt = readHeader(builder, stream); 219 WALHdrResult walHdrRes = hdrCtxt.getResult(); 220 if (walHdrRes == WALHdrResult.EOF) { 221 throw new EOFException("Couldn't read WAL PB header"); 222 } 223 if (walHdrRes == WALHdrResult.UNKNOWN_WRITER_CLS) { 224 throw new IOException("Got unknown writer class: " + builder.getWriterClsName()); 225 } 226 if (isFirst) { 227 WALProtos.WALHeader header = builder.build(); 228 this.hasCompression = header.hasHasCompression() && header.getHasCompression(); 229 this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression(); 230 } 231 this.inputStream = stream; 232 this.walEditsStopOffset = this.fileLength; 233 long currentPosition = stream.getPos(); 234 trailerPresent = setTrailerIfPresent(); 235 this.seekOnFs(currentPosition); 236 if (LOG.isTraceEnabled()) { 237 LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset 238 + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + (trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false") + ", currentPosition: " + currentPosition); 239 } 240 241 codecClsName = hdrCtxt.getCellCodecClsName(); 242 243 return hdrCtxt.getCellCodecClsName(); 244 } 245 246 /** 247 * To check whether a trailer is present in a WAL, it seeks to position (fileLength - 248 * PB_WAL_COMPLETE_MAGIC.size() - Bytes.SIZEOF_INT). It reads the int value to know the size of 249 * the trailer, and checks whether the trailer is present at the end or not by comparing the last 250 * PB_WAL_COMPLETE_MAGIC.size() bytes. In case trailer is not present, it returns false; 251 * otherwise, sets the trailer and sets this.walEditsStopOffset variable up to the point just 252 * before the trailer. 253 * <ul> 254 * The trailer is ignored in case: 255 * <li>fileLength is 0 or not correct (when file is under recovery, etc). 256 * <li>the trailer size is negative. 257 * </ul> 258 * <p> 259 * In case the trailer size > this.trailerMaxSize, it is read after a WARN message. 260 * @return true if a valid trailer is present 261 * @throws IOException 262 */ 263 private boolean setTrailerIfPresent() { 264 try { 265 long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT); 266 if (trailerSizeOffset <= 0) return false;// no trailer possible. 267 this.seekOnFs(trailerSizeOffset); 268 // read the int as trailer size. 269 int trailerSize = this.inputStream.readInt(); 270 ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length); 271 this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity()); 272 if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) { 273 LOG.trace("No trailer found."); 274 return false; 275 } 276 if (trailerSize < 0) { 277 LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer"); 278 return false; 279 } else if (trailerSize > this.trailerWarnSize) { 280 // continue reading after warning the user. 281 LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : " 282 + trailerSize + " > " + this.trailerWarnSize); 283 } 284 // seek to the position where trailer starts. 285 long positionOfTrailer = trailerSizeOffset - trailerSize; 286 this.seekOnFs(positionOfTrailer); 287 // read the trailer. 288 buf = ByteBuffer.allocate(trailerSize);// for trailer. 289 this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity()); 290 trailer = WALTrailer.parseFrom(buf.array()); 291 this.walEditsStopOffset = positionOfTrailer; 292 return true; 293 } catch (IOException ioe) { 294 LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe); 295 } 296 return false; 297 } 298 299 protected WALCellCodec getCodec(Configuration conf, String cellCodecClsName, 300 CompressionContext compressionContext) throws IOException { 301 return WALCellCodec.create(conf, cellCodecClsName, compressionContext); 302 } 303 304 @Override 305 protected void initAfterCompression() throws IOException { 306 initAfterCompression(null); 307 } 308 309 @Override 310 protected void initAfterCompression(String cellCodecClsName) throws IOException { 311 WALCellCodec codec = getCodec(this.conf, cellCodecClsName, this.compressionContext); 312 this.cellDecoder = codec.getDecoder(this.inputStream); 313 if (this.hasCompression) { 314 this.byteStringUncompressor = codec.getByteStringUncompressor(); 315 } 316 } 317 318 @Override 319 protected boolean hasCompression() { 320 return this.hasCompression; 321 } 322 323 @Override 324 protected boolean hasTagCompression() { 325 return this.hasTagCompression; 326 } 327 328 @Override 329 protected boolean readNext(Entry entry) throws IOException { 330 while (true) { 331 // OriginalPosition might be < 0 on local fs; if so, it is useless to us. 332 long originalPosition = this.inputStream.getPos(); 333 if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) { 334 LOG.trace("Reached end of expected edits area at offset {}", originalPosition); 335 return false; 336 } 337 WALKey.Builder builder = WALKey.newBuilder(); 338 long size = 0; 339 boolean resetPosition = false; 340 try { 341 long available = -1; 342 try { 343 int firstByte = this.inputStream.read(); 344 if (firstByte == -1) { 345 throw new EOFException("First byte is negative at offset " + originalPosition); 346 } 347 size = CodedInputStream.readRawVarint32(firstByte, this.inputStream); 348 // available may be < 0 on local fs for instance. If so, can't depend on it. 349 available = this.inputStream.available(); 350 if (available > 0 && available < size) { 351 throw new EOFException("Available stream not enough for edit, " + 352 "inputStream.available()= " + this.inputStream.available() + ", " + 353 "entry size= " + size + " at offset = " + this.inputStream.getPos()); 354 } 355 ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size), 356 (int)size); 357 } catch (InvalidProtocolBufferException ipbe) { 358 resetPosition = true; 359 throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" + 360 originalPosition + ", currentPosition=" + this.inputStream.getPos() + 361 ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe); 362 } 363 if (!builder.isInitialized()) { 364 // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit. 365 // If we can get the KV count, we could, theoretically, try to get next record. 366 throw new EOFException("Partial PB while reading WAL, " + 367 "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos()); 368 } 369 WALKey walKey = builder.build(); 370 entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor); 371 if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) { 372 LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}", 373 this.inputStream.getPos()); 374 seekOnFs(originalPosition); 375 return false; 376 } 377 int expectedCells = walKey.getFollowingKvCount(); 378 long posBefore = this.inputStream.getPos(); 379 try { 380 int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells); 381 if (expectedCells != actualCells) { 382 resetPosition = true; 383 throw new EOFException("Only read " + actualCells); // other info added in catch 384 } 385 } catch (Exception ex) { 386 String posAfterStr = "<unknown>"; 387 try { 388 posAfterStr = this.inputStream.getPos() + ""; 389 } catch (Throwable t) { 390 LOG.trace("Error getting pos for error message - ignoring", t); 391 } 392 String message = " while reading " + expectedCells + " WAL KVs; started reading at " 393 + posBefore + " and read up to " + posAfterStr; 394 IOException realEofEx = extractHiddenEof(ex); 395 throw (EOFException) new EOFException("EOF " + message). 396 initCause(realEofEx != null ? realEofEx : ex); 397 } 398 if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) { 399 LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path 400 + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: " 401 + this.walEditsStopOffset); 402 throw new EOFException("Read WALTrailer while reading WALEdits"); 403 } 404 } catch (EOFException eof) { 405 // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs) 406 if (originalPosition < 0) { 407 LOG.warn("Encountered a malformed edit, but can't seek back to last good position " 408 + "because originalPosition is negative. last offset={}", 409 this.inputStream.getPos(), eof); 410 throw eof; 411 } 412 // If stuck at the same place and we got and exception, lets go back at the beginning. 413 if (inputStream.getPos() == originalPosition && resetPosition) { 414 LOG.warn("Encountered a malformed edit, seeking to the beginning of the WAL since " 415 + "current position and original position match at {}", originalPosition); 416 seekOnFs(0); 417 } else { 418 // Else restore our position to original location in hope that next time through we will 419 // read successfully. 420 LOG.warn("Encountered a malformed edit, seeking back to last good position in file, " 421 + "from {} to {}", inputStream.getPos(), originalPosition, eof); 422 seekOnFs(originalPosition); 423 } 424 return false; 425 } 426 return true; 427 } 428 } 429 430 private IOException extractHiddenEof(Exception ex) { 431 // There are two problems we are dealing with here. Hadoop stream throws generic exception 432 // for EOF, not EOFException; and scanner further hides it inside RuntimeException. 433 IOException ioEx = null; 434 if (ex instanceof EOFException) { 435 return (EOFException)ex; 436 } else if (ex instanceof IOException) { 437 ioEx = (IOException)ex; 438 } else if (ex instanceof RuntimeException 439 && ex.getCause() != null && ex.getCause() instanceof IOException) { 440 ioEx = (IOException)ex.getCause(); 441 } 442 if (ioEx != null) { 443 if (ioEx.getMessage().contains("EOF")) return ioEx; 444 return null; 445 } 446 return null; 447 } 448 449 @Override 450 protected void seekOnFs(long pos) throws IOException { 451 this.inputStream.seek(pos); 452 } 453}