001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.EOFException; 021import java.io.IOException; 022import java.nio.ByteBuffer; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.List; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FSDataInputStream; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseInterfaceAudience; 031import org.apache.hadoop.hbase.codec.Codec; 032import org.apache.hadoop.hbase.io.compress.Compression; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.wal.WAL.Entry; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; 040import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; 041import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 042 043import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader.Builder; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; 048 049/** 050 * A Protobuf based WAL has the following structure: 051 * <p> 052 * <PB_WAL_MAGIC><WALHeader><WALEdits>...<WALEdits><Trailer> 053 * <TrailerSize> <PB_WAL_COMPLETE_MAGIC> 054 * </p> 055 * The Reader reads meta information (WAL Compression state, WALTrailer, etc) in 056 * ProtobufLogReader#initReader(FSDataInputStream). A WALTrailer is an extensible structure which is 057 * appended at the end of the WAL. This is empty for now; it can contain some meta information such 058 * as Region level stats, etc in future. 059 */ 060@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, 061 HBaseInterfaceAudience.CONFIG }) 062public class ProtobufLogReader extends ReaderBase { 063 private static final Logger LOG = LoggerFactory.getLogger(ProtobufLogReader.class); 064 // public for WALFactory until we move everything to o.a.h.h.wal 065 @InterfaceAudience.Private 066 public static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL"); 067 // public for TestWALSplit 068 @InterfaceAudience.Private 069 public static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP"); 070 /** 071 * Configuration name of WAL Trailer's warning size. If a waltrailer's size is greater than the 072 * configured size, providers should log a warning. e.g. this is used with Protobuf reader/writer. 073 */ 074 static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size"; 075 static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB 076 077 protected FSDataInputStream inputStream; 078 protected Codec.Decoder cellDecoder; 079 protected WALCellCodec.ByteStringUncompressor byteStringUncompressor; 080 protected boolean hasCompression = false; 081 protected boolean hasTagCompression = false; 082 protected boolean hasValueCompression = false; 083 protected Compression.Algorithm valueCompressionType = null; 084 // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit 085 // entry in the wal, the inputstream's position is equal to walEditsStopOffset. 086 private long walEditsStopOffset; 087 private boolean trailerPresent; 088 protected WALTrailer trailer; 089 // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger 090 // than this size, it is written/read respectively, with a WARN message in the log. 091 protected int trailerWarnSize; 092 private static List<String> writerClsNames = new ArrayList<>(); 093 static { 094 writerClsNames.add(ProtobufLogWriter.class.getSimpleName()); 095 writerClsNames.add(AsyncProtobufLogWriter.class.getSimpleName()); 096 } 097 098 // cell codec classname 099 private String codecClsName = null; 100 101 @InterfaceAudience.Private 102 public long trailerSize() { 103 if (trailerPresent) { 104 // sizeof PB_WAL_COMPLETE_MAGIC + sizof trailerSize + trailer 105 final long calculatedSize = 106 (long) PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT + trailer.getSerializedSize(); 107 final long expectedSize = fileLength - walEditsStopOffset; 108 if (expectedSize != calculatedSize) { 109 LOG.warn("After parsing the trailer, we expect the total footer to be {} bytes, but we " 110 + "calculate it as being {}", expectedSize, calculatedSize); 111 } 112 return expectedSize; 113 } else { 114 return -1L; 115 } 116 } 117 118 enum WALHdrResult { 119 EOF, // stream is at EOF when method starts 120 SUCCESS, 121 UNKNOWN_WRITER_CLS // name of writer class isn't recognized 122 } 123 124 // context for WALHdr carrying information such as Cell Codec classname 125 static class WALHdrContext { 126 WALHdrResult result; 127 String cellCodecClsName; 128 129 WALHdrContext(WALHdrResult result, String cellCodecClsName) { 130 this.result = result; 131 this.cellCodecClsName = cellCodecClsName; 132 } 133 134 WALHdrResult getResult() { 135 return result; 136 } 137 138 String getCellCodecClsName() { 139 return cellCodecClsName; 140 } 141 } 142 143 public ProtobufLogReader() { 144 super(); 145 } 146 147 @Override 148 public void close() throws IOException { 149 if (this.inputStream != null) { 150 this.inputStream.close(); 151 this.inputStream = null; 152 } 153 } 154 155 @Override 156 public long getPosition() throws IOException { 157 return inputStream.getPos(); 158 } 159 160 @Override 161 public void reset() throws IOException { 162 String clsName = initInternal(null, false); 163 initAfterCompression(clsName); // We need a new decoder (at least). 164 } 165 166 @Override 167 public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream) 168 throws IOException { 169 this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); 170 super.init(fs, path, conf, stream); 171 } 172 173 @Override 174 protected String initReader(FSDataInputStream stream) throws IOException { 175 return initInternal(stream, true); 176 } 177 178 /* 179 * Returns names of the accepted writer classes 180 */ 181 public List<String> getWriterClsNames() { 182 return writerClsNames; 183 } 184 185 /* 186 * Returns the cell codec classname 187 */ 188 public String getCodecClsName() { 189 return codecClsName; 190 } 191 192 protected WALHdrContext readHeader(Builder builder, FSDataInputStream stream) throws IOException { 193 boolean res = builder.mergeDelimitedFrom(stream); 194 if (!res) return new WALHdrContext(WALHdrResult.EOF, null); 195 if (builder.hasWriterClsName() && !getWriterClsNames().contains(builder.getWriterClsName())) { 196 return new WALHdrContext(WALHdrResult.UNKNOWN_WRITER_CLS, null); 197 } 198 String clsName = null; 199 if (builder.hasCellCodecClsName()) { 200 clsName = builder.getCellCodecClsName(); 201 } 202 return new WALHdrContext(WALHdrResult.SUCCESS, clsName); 203 } 204 205 private String initInternal(FSDataInputStream stream, boolean isFirst) throws IOException { 206 close(); 207 if (!isFirst) { 208 // Re-compute the file length. 209 this.fileLength = fs.getFileStatus(path).getLen(); 210 } 211 long expectedPos = PB_WAL_MAGIC.length; 212 if (stream == null) { 213 stream = fs.open(path); 214 stream.seek(expectedPos); 215 } 216 if (stream.getPos() != expectedPos) { 217 throw new IOException("The stream is at invalid position: " + stream.getPos()); 218 } 219 // Initialize metadata or, when we reset, just skip the header. 220 WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder(); 221 WALHdrContext hdrCtxt = readHeader(builder, stream); 222 WALHdrResult walHdrRes = hdrCtxt.getResult(); 223 if (walHdrRes == WALHdrResult.EOF) { 224 throw new EOFException("Couldn't read WAL PB header"); 225 } 226 if (walHdrRes == WALHdrResult.UNKNOWN_WRITER_CLS) { 227 throw new IOException("Got unknown writer class: " + builder.getWriterClsName()); 228 } 229 if (isFirst) { 230 WALProtos.WALHeader header = builder.build(); 231 this.hasCompression = header.hasHasCompression() && header.getHasCompression(); 232 this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression(); 233 this.hasValueCompression = header.hasHasValueCompression() && header.getHasValueCompression(); 234 if (header.hasValueCompressionAlgorithm()) { 235 try { 236 this.valueCompressionType = 237 Compression.Algorithm.values()[header.getValueCompressionAlgorithm()]; 238 } catch (ArrayIndexOutOfBoundsException e) { 239 throw new IOException("Invalid compression type", e); 240 } 241 } 242 } 243 this.inputStream = stream; 244 this.walEditsStopOffset = this.fileLength; 245 long currentPosition = stream.getPos(); 246 trailerPresent = setTrailerIfPresent(); 247 this.seekOnFs(currentPosition); 248 if (LOG.isTraceEnabled()) { 249 LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset 250 + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " 251 + (trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false") 252 + ", currentPosition: " + currentPosition); 253 } 254 255 codecClsName = hdrCtxt.getCellCodecClsName(); 256 257 return hdrCtxt.getCellCodecClsName(); 258 } 259 260 /** 261 * To check whether a trailer is present in a WAL, it seeks to position (fileLength - 262 * PB_WAL_COMPLETE_MAGIC.size() - Bytes.SIZEOF_INT). It reads the int value to know the size of 263 * the trailer, and checks whether the trailer is present at the end or not by comparing the last 264 * PB_WAL_COMPLETE_MAGIC.size() bytes. In case trailer is not present, it returns false; 265 * otherwise, sets the trailer and sets this.walEditsStopOffset variable up to the point just 266 * before the trailer. 267 * <ul> 268 * The trailer is ignored in case: 269 * <li>fileLength is 0 or not correct (when file is under recovery, etc). 270 * <li>the trailer size is negative. 271 * </ul> 272 * <p> 273 * In case the trailer size > this.trailerMaxSize, it is read after a WARN message. 274 * @return true if a valid trailer is present n 275 */ 276 private boolean setTrailerIfPresent() { 277 try { 278 long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT); 279 if (trailerSizeOffset <= 0) return false;// no trailer possible. 280 this.seekOnFs(trailerSizeOffset); 281 // read the int as trailer size. 282 int trailerSize = this.inputStream.readInt(); 283 ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length); 284 this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity()); 285 if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) { 286 LOG.trace("No trailer found."); 287 return false; 288 } 289 if (trailerSize < 0) { 290 LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer"); 291 return false; 292 } else if (trailerSize > this.trailerWarnSize) { 293 // continue reading after warning the user. 294 LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : " 295 + trailerSize + " > " + this.trailerWarnSize); 296 } 297 // seek to the position where trailer starts. 298 long positionOfTrailer = trailerSizeOffset - trailerSize; 299 this.seekOnFs(positionOfTrailer); 300 // read the trailer. 301 buf = ByteBuffer.allocate(trailerSize);// for trailer. 302 this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity()); 303 trailer = WALTrailer.parseFrom(buf.array()); 304 this.walEditsStopOffset = positionOfTrailer; 305 return true; 306 } catch (IOException ioe) { 307 LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe); 308 } 309 return false; 310 } 311 312 protected WALCellCodec getCodec(Configuration conf, String cellCodecClsName, 313 CompressionContext compressionContext) throws IOException { 314 return WALCellCodec.create(conf, cellCodecClsName, compressionContext); 315 } 316 317 @Override 318 protected void initAfterCompression() throws IOException { 319 initAfterCompression(null); 320 } 321 322 @Override 323 protected void initAfterCompression(String cellCodecClsName) throws IOException { 324 WALCellCodec codec = getCodec(this.conf, cellCodecClsName, this.compressionContext); 325 this.cellDecoder = codec.getDecoder(this.inputStream); 326 if (this.hasCompression) { 327 this.byteStringUncompressor = codec.getByteStringUncompressor(); 328 } else { 329 this.byteStringUncompressor = WALCellCodec.getNoneUncompressor(); 330 } 331 } 332 333 @Override 334 protected boolean hasCompression() { 335 return this.hasCompression; 336 } 337 338 @Override 339 protected boolean hasTagCompression() { 340 return this.hasTagCompression; 341 } 342 343 @Override 344 protected boolean hasValueCompression() { 345 return this.hasValueCompression; 346 } 347 348 @Override 349 protected Compression.Algorithm getValueCompressionAlgorithm() { 350 return this.valueCompressionType; 351 } 352 353 @Override 354 protected boolean readNext(Entry entry) throws IOException { 355 // OriginalPosition might be < 0 on local fs; if so, it is useless to us. 356 long originalPosition = this.inputStream.getPos(); 357 if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) { 358 LOG.trace("Reached end of expected edits area at offset {}", originalPosition); 359 return false; 360 } 361 WALKey.Builder builder = WALKey.newBuilder(); 362 long size = 0; 363 boolean resetPosition = false; 364 try { 365 long available = -1; 366 try { 367 int firstByte = this.inputStream.read(); 368 if (firstByte == -1) { 369 throw new EOFException(); 370 } 371 size = CodedInputStream.readRawVarint32(firstByte, this.inputStream); 372 // available may be < 0 on local fs for instance. If so, can't depend on it. 373 available = this.inputStream.available(); 374 if (available > 0 && available < size) { 375 throw new EOFException("Available stream not enough for edit, " 376 + "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= " 377 + size + " at offset = " + this.inputStream.getPos()); 378 } 379 ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size), (int) size); 380 } catch (InvalidProtocolBufferException ipbe) { 381 resetPosition = true; 382 throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" 383 + originalPosition + ", currentPosition=" + this.inputStream.getPos() + ", messageSize=" 384 + size + ", currentAvailable=" + available).initCause(ipbe); 385 } 386 if (!builder.isInitialized()) { 387 // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit. 388 // If we can get the KV count, we could, theoretically, try to get next record. 389 throw new EOFException("Partial PB while reading WAL, " 390 + "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos()); 391 } 392 WALKey walKey = builder.build(); 393 entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor); 394 if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) { 395 LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}", 396 this.inputStream.getPos()); 397 seekOnFs(originalPosition); 398 return false; 399 } 400 int expectedCells = walKey.getFollowingKvCount(); 401 long posBefore = this.inputStream.getPos(); 402 try { 403 int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells); 404 if (expectedCells != actualCells) { 405 resetPosition = true; 406 throw new EOFException("Only read " + actualCells); // other info added in catch 407 } 408 } catch (Exception ex) { 409 String posAfterStr = "<unknown>"; 410 try { 411 posAfterStr = this.inputStream.getPos() + ""; 412 } catch (Throwable t) { 413 LOG.trace("Error getting pos for error message - ignoring", t); 414 } 415 String message = " while reading " + expectedCells + " WAL KVs; started reading at " 416 + posBefore + " and read up to " + posAfterStr; 417 IOException realEofEx = extractHiddenEof(ex); 418 throw (EOFException) new EOFException("EOF " + message) 419 .initCause(realEofEx != null ? realEofEx : ex); 420 } 421 if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) { 422 LOG.error( 423 "Read WALTrailer while reading WALEdits. wal: " + this.path + ", inputStream.getPos(): " 424 + this.inputStream.getPos() + ", walEditsStopOffset: " + this.walEditsStopOffset); 425 throw new EOFException("Read WALTrailer while reading WALEdits"); 426 } 427 } catch (EOFException eof) { 428 // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs) 429 if (originalPosition < 0) { 430 LOG.debug( 431 "Encountered a malformed edit, but can't seek back to last good position " 432 + "because originalPosition is negative. last offset={}", 433 this.inputStream.getPos(), eof); 434 throw eof; 435 } 436 // If stuck at the same place and we got an exception, lets go back at the beginning. 437 if (inputStream.getPos() == originalPosition) { 438 if (resetPosition) { 439 LOG.debug("Encountered a malformed edit, seeking to the beginning of the WAL since " 440 + "current position and original position match at {}", originalPosition); 441 seekOnFs(0); 442 } else { 443 LOG.debug("EOF at position {}", originalPosition); 444 } 445 } else { 446 // Else restore our position to original location in hope that next time through we will 447 // read successfully. 448 LOG.debug("Encountered a malformed edit, seeking back to last good position in file, " 449 + "from {} to {}", inputStream.getPos(), originalPosition, eof); 450 seekOnFs(originalPosition); 451 } 452 return false; 453 } 454 return true; 455 } 456 457 private IOException extractHiddenEof(Exception ex) { 458 // There are two problems we are dealing with here. Hadoop stream throws generic exception 459 // for EOF, not EOFException; and scanner further hides it inside RuntimeException. 460 IOException ioEx = null; 461 if (ex instanceof EOFException) { 462 return (EOFException) ex; 463 } else if (ex instanceof IOException) { 464 ioEx = (IOException) ex; 465 } else if ( 466 ex instanceof RuntimeException && ex.getCause() != null 467 && ex.getCause() instanceof IOException 468 ) { 469 ioEx = (IOException) ex.getCause(); 470 } 471 if ((ioEx != null) && (ioEx.getMessage() != null)) { 472 if (ioEx.getMessage().contains("EOF")) return ioEx; 473 return null; 474 } 475 return null; 476 } 477 478 @Override 479 protected void seekOnFs(long pos) throws IOException { 480 this.inputStream.seek(pos); 481 } 482}