001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.Closeable; 021import java.io.EOFException; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InputStream; 025import java.nio.ByteBuffer; 026import java.security.Key; 027import java.security.KeyException; 028import java.util.Arrays; 029import java.util.List; 030import org.apache.commons.io.IOUtils; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FSDataInputStream; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.codec.Codec; 038import org.apache.hadoop.hbase.io.compress.Compression; 039import org.apache.hadoop.hbase.io.crypto.Cipher; 040import org.apache.hadoop.hbase.io.crypto.Decryptor; 041import org.apache.hadoop.hbase.io.crypto.Encryption; 042import org.apache.hadoop.hbase.io.util.LRUDictionary; 043import org.apache.hadoop.hbase.security.EncryptionUtil; 044import org.apache.hadoop.hbase.security.User; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.CommonFSUtils; 047import org.apache.hadoop.hbase.util.EncryptionTest; 048import org.apache.hadoop.hbase.util.Pair; 049import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 050import org.apache.hadoop.ipc.RemoteException; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 056import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 057import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 058 059import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; 062 063/** 064 * Base class for reading protobuf based wal reader 065 */ 066@InterfaceAudience.Private 067public abstract class AbstractProtobufWALReader 068 implements AbstractFSWALProvider.Initializer, Closeable { 069 070 private static final Logger LOG = LoggerFactory.getLogger(AbstractProtobufWALReader.class); 071 072 // public for WALFactory until we move everything to o.a.h.h.wal 073 public static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL"); 074 075 // public for TestWALSplit 076 public static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP"); 077 078 /** 079 * Configuration name of WAL Trailer's warning size. If a waltrailer's size is greater than the 080 * configured size, providers should log a warning. e.g. this is used with Protobuf reader/writer. 081 */ 082 static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size"; 083 static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB 084 085 private static final List<String> WRITER_CLS_NAMES = ImmutableList.of( 086 ProtobufLogWriter.class.getSimpleName(), AsyncProtobufLogWriter.class.getSimpleName(), 087 "SecureProtobufLogWriter", "SecureAsyncProtobufLogWriter"); 088 089 protected Configuration conf; 090 091 protected FileSystem fs; 092 093 protected Path path; 094 095 protected long fileLength; 096 097 protected FSDataInputStream inputStream; 098 099 protected CompressionContext compressionCtx; 100 protected boolean hasCompression = false; 101 protected boolean hasTagCompression = false; 102 protected boolean hasValueCompression = false; 103 protected Compression.Algorithm valueCompressionType; 104 105 protected Codec.Decoder cellDecoder; 106 protected WALCellCodec.ByteStringUncompressor byteStringUncompressor; 107 108 protected long walEditsStopOffset; 109 protected boolean trailerPresent; 110 protected WALTrailer trailer; 111 // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger 112 // than this size, it is written/read respectively, with a WARN message in the log. 113 protected int trailerWarnSize; 114 115 // cell codec classname 116 protected String codecClsName; 117 118 protected Decryptor decryptor; 119 120 /** 121 * Get or create the input stream used by cell decoder. 122 * <p/> 123 * For implementing replication, we may need to limit the bytes we can read, so here we provide a 124 * method so subclasses can wrap the original input stream. 125 */ 126 protected abstract InputStream getCellCodecInputStream(FSDataInputStream stream); 127 128 /** 129 * Skip to the given position. 130 */ 131 protected abstract void skipTo(long position) throws IOException; 132 133 @Override 134 public void init(FileSystem fs, Path path, Configuration conf, long startPosition) 135 throws IOException { 136 this.conf = conf; 137 this.path = path; 138 this.fs = fs; 139 this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); 140 141 Pair<FSDataInputStream, FileStatus> pair = open(); 142 FSDataInputStream stream = pair.getFirst(); 143 FileStatus stat = pair.getSecond(); 144 boolean initSucceeded = false; 145 try { 146 // read the header 147 WALProtos.WALHeader header = readHeader(stream); 148 // initialize metadata and fields 149 initDecryptor(header); 150 initCompression(header); 151 initWALCellCodec(header, getCellCodecInputStream(stream)); 152 153 // read trailer if available 154 readTrailer(stream, stat); 155 156 // this is intentional as we do not want the above methods to use the inputStream field. For 157 // implementation tailing reader, we need to wrap the input stream when creating cell decoder, 158 // so we need to make sure in the above methods we do not accidentally use the stored 159 // inputStream directly and cause trouble. If a method needs to use an input stream, we just 160 // pass the input stream in, like readHeader and readTrailer. 161 this.inputStream = stream; 162 163 // seek to the given position if it is not -1 164 if (startPosition >= 0 && startPosition != inputStream.getPos()) { 165 if (compressionCtx != null) { 166 // skip to the position, as we need to construct the compression dictionary 167 skipTo(startPosition); 168 } else { 169 // just seek to the position 170 stream.seek(startPosition); 171 } 172 } 173 initSucceeded = true; 174 } finally { 175 if (!initSucceeded) { 176 Closeables.close(stream, initSucceeded); 177 inputStream = null; 178 } 179 } 180 } 181 182 private Pair<FSDataInputStream, FileStatus> openArchivedWAL() throws IOException { 183 Path archivedWAL = AbstractFSWALProvider.findArchivedLog(path, conf); 184 if (archivedWAL != null) { 185 // try open from oldWAL dir 186 return Pair.newPair(fs.open(archivedWAL), fs.getFileStatus(archivedWAL)); 187 } else { 188 return null; 189 } 190 } 191 192 protected final Pair<FSDataInputStream, FileStatus> open() throws IOException { 193 try { 194 return Pair.newPair(fs.open(path), fs.getFileStatus(path)); 195 } catch (FileNotFoundException e) { 196 Pair<FSDataInputStream, FileStatus> pair = openArchivedWAL(); 197 if (pair != null) { 198 return pair; 199 } else { 200 throw e; 201 } 202 } catch (RemoteException re) { 203 IOException ioe = re.unwrapRemoteException(FileNotFoundException.class); 204 if (!(ioe instanceof FileNotFoundException)) { 205 throw ioe; 206 } 207 Pair<FSDataInputStream, FileStatus> pair = openArchivedWAL(); 208 if (pair != null) { 209 return pair; 210 } else { 211 throw ioe; 212 } 213 } 214 } 215 216 protected final WALProtos.WALHeader readHeader(FSDataInputStream stream) throws IOException { 217 byte[] magic = new byte[PB_WAL_MAGIC.length]; 218 try { 219 stream.readFully(magic); 220 } catch (EOFException e) { 221 throw new WALHeaderEOFException("EOF while reading PB WAL magic", e); 222 } 223 if (!Arrays.equals(PB_WAL_MAGIC, magic)) { 224 throw new IOException("Invalid PB WAL magic " + Bytes.toStringBinary(magic) + ", expected " 225 + Bytes.toStringBinary(PB_WAL_MAGIC)); 226 } 227 WALProtos.WALHeader header; 228 try { 229 header = ProtobufUtil.parseDelimitedFrom(stream, WALProtos.WALHeader.parser()); 230 } catch (InvalidProtocolBufferException e) { 231 if (ProtobufUtil.isEOF(e)) { 232 throw new WALHeaderEOFException("EOF while reading PB header", e); 233 } else { 234 throw e; 235 } 236 } catch (EOFException e) { 237 throw new WALHeaderEOFException("EOF while reading PB header", e); 238 } 239 if (header == null) { 240 throw new WALHeaderEOFException("EOF while reading PB header"); 241 } 242 if (header.hasWriterClsName() && !getWriterClsNames().contains(header.getWriterClsName())) { 243 throw new IOException("Got unknown writer class: " + header.getWriterClsName()); 244 } 245 return header; 246 } 247 248 private void initDecryptor(WALProtos.WALHeader header) throws IOException { 249 if (!header.hasEncryptionKey()) { 250 return; 251 } 252 EncryptionTest.testKeyProvider(conf); 253 EncryptionTest.testCipherProvider(conf); 254 255 // Retrieve a usable key 256 byte[] keyBytes = header.getEncryptionKey().toByteArray(); 257 Key key = null; 258 String walKeyName = conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY); 259 // First try the WAL key, if one is configured 260 if (walKeyName != null) { 261 try { 262 key = EncryptionUtil.unwrapWALKey(conf, walKeyName, keyBytes); 263 } catch (KeyException e) { 264 LOG.debug("Unable to unwrap key with WAL key '{}'", walKeyName, e); 265 key = null; 266 } 267 } 268 if (key == null) { 269 String masterKeyName = 270 conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()); 271 try { 272 // Then, try the cluster master key 273 key = EncryptionUtil.unwrapWALKey(conf, masterKeyName, keyBytes); 274 } catch (KeyException e) { 275 // If the current master key fails to unwrap, try the alternate, if 276 // one is configured 277 LOG.debug("Unable to unwrap key with current master key '{}'", masterKeyName, e); 278 String alternateKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY); 279 if (alternateKeyName != null) { 280 try { 281 key = EncryptionUtil.unwrapWALKey(conf, alternateKeyName, keyBytes); 282 } catch (KeyException ex) { 283 throw new IOException(ex); 284 } 285 } else { 286 throw new IOException(e); 287 } 288 } 289 } 290 291 // Use the algorithm the key wants 292 Cipher cipher = Encryption.getCipher(conf, key.getAlgorithm()); 293 if (cipher == null) { 294 throw new IOException("Cipher '" + key.getAlgorithm() + "' is not available"); 295 } 296 297 // Set up the decryptor for this WAL 298 299 decryptor = cipher.getDecryptor(); 300 decryptor.setKey(key); 301 302 LOG.debug("Initialized secure protobuf WAL: cipher={}", cipher.getName()); 303 } 304 305 private void initCompression(WALProtos.WALHeader header) throws IOException { 306 this.hasCompression = header.hasHasCompression() && header.getHasCompression(); 307 if (!hasCompression) { 308 return; 309 } 310 this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression(); 311 this.hasValueCompression = header.hasHasValueCompression() && header.getHasValueCompression(); 312 if (header.hasValueCompressionAlgorithm()) { 313 try { 314 this.valueCompressionType = 315 Compression.Algorithm.values()[header.getValueCompressionAlgorithm()]; 316 } catch (ArrayIndexOutOfBoundsException e) { 317 throw new IOException("Invalid compression type", e); 318 } 319 } 320 if (LOG.isDebugEnabled()) { 321 LOG.debug( 322 "Initializing compression context for {}: isRecoveredEdits={}" 323 + ", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}", 324 path, CommonFSUtils.isRecoveredEdits(path), hasTagCompression, hasValueCompression, 325 valueCompressionType); 326 } 327 try { 328 compressionCtx = 329 new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path), 330 hasTagCompression, hasValueCompression, valueCompressionType); 331 } catch (Exception e) { 332 throw new IOException("Failed to initialize CompressionContext", e); 333 } 334 } 335 336 private WALCellCodec getCodec(Configuration conf, String cellCodecClsName, 337 CompressionContext compressionContext) throws IOException { 338 return WALCellCodec.create(conf, cellCodecClsName, compressionContext); 339 } 340 341 protected final void initWALCellCodec(WALProtos.WALHeader header, InputStream inputStream) 342 throws IOException { 343 String cellCodecClsName = header.hasCellCodecClsName() ? header.getCellCodecClsName() : null; 344 if (decryptor != null && SecureWALCellCodec.class.getName().equals(cellCodecClsName)) { 345 WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, decryptor); 346 this.cellDecoder = codec.getDecoder(inputStream); 347 // We do not support compression with WAL encryption 348 this.compressionCtx = null; 349 this.byteStringUncompressor = WALCellCodec.getNoneUncompressor(); 350 this.hasCompression = false; 351 this.hasTagCompression = false; 352 this.hasValueCompression = false; 353 } else { 354 WALCellCodec codec = getCodec(conf, cellCodecClsName, compressionCtx); 355 this.cellDecoder = codec.getDecoder(inputStream); 356 if (this.hasCompression) { 357 this.byteStringUncompressor = codec.getByteStringUncompressor(); 358 } else { 359 this.byteStringUncompressor = WALCellCodec.getNoneUncompressor(); 360 } 361 } 362 this.codecClsName = cellCodecClsName; 363 } 364 365 protected final void readTrailer(FSDataInputStream stream, FileStatus stat) throws IOException { 366 this.fileLength = stat.getLen(); 367 this.walEditsStopOffset = this.fileLength; 368 long currentPos = stream.getPos(); 369 // we will reset walEditsStopOffset if trailer is available 370 trailerPresent = setTrailerIfPresent(stream); 371 if (currentPos != stream.getPos()) { 372 // seek back 373 stream.seek(currentPos); 374 } 375 } 376 377 /** 378 * To check whether a trailer is present in a WAL, it seeks to position (fileLength - 379 * PB_WAL_COMPLETE_MAGIC.size() - Bytes.SIZEOF_INT). It reads the int value to know the size of 380 * the trailer, and checks whether the trailer is present at the end or not by comparing the last 381 * PB_WAL_COMPLETE_MAGIC.size() bytes. In case trailer is not present, it returns false; 382 * otherwise, sets the trailer and sets this.walEditsStopOffset variable up to the point just 383 * before the trailer. 384 * <p/> 385 * The trailer is ignored in case: 386 * <ul> 387 * <li>fileLength is 0 or not correct (when file is under recovery, etc). 388 * <li>the trailer size is negative. 389 * </ul> 390 * In case the trailer size > this.trailerMaxSize, it is read after a WARN message. 391 * @return true if a valid trailer is present 392 */ 393 private boolean setTrailerIfPresent(FSDataInputStream stream) throws IOException { 394 try { 395 long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT); 396 if (trailerSizeOffset <= 0) { 397 // no trailer possible. 398 return false; 399 } 400 stream.seek(trailerSizeOffset); 401 // read the int as trailer size. 402 int trailerSize = stream.readInt(); 403 ByteBuffer buf = ByteBuffer.allocate(PB_WAL_COMPLETE_MAGIC.length); 404 stream.readFully(buf.array(), buf.arrayOffset(), buf.capacity()); 405 if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) { 406 LOG.trace("No trailer found."); 407 return false; 408 } 409 if (trailerSize < 0) { 410 LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer"); 411 return false; 412 } else if (trailerSize > this.trailerWarnSize) { 413 // continue reading after warning the user. 414 LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : " 415 + trailerSize + " > " + this.trailerWarnSize); 416 } 417 // seek to the position where trailer starts. 418 long positionOfTrailer = trailerSizeOffset - trailerSize; 419 stream.seek(positionOfTrailer); 420 // read the trailer. 421 buf = ByteBuffer.allocate(trailerSize);// for trailer. 422 stream.readFully(buf.array(), buf.arrayOffset(), buf.capacity()); 423 trailer = WALTrailer.parseFrom(buf.array()); 424 this.walEditsStopOffset = positionOfTrailer; 425 return true; 426 } catch (IOException ioe) { 427 LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe); 428 } 429 return false; 430 } 431 432 protected final boolean reachWALEditsStopOffset(long pos) { 433 if (trailerPresent && pos > 0 && pos == walEditsStopOffset) { 434 LOG.trace("Reached end of expected edits area at offset {}", pos); 435 return true; 436 } else { 437 return false; 438 } 439 } 440 441 /** 442 * Returns names of the accepted writer classes 443 */ 444 public List<String> getWriterClsNames() { 445 return WRITER_CLS_NAMES; 446 } 447 448 /** 449 * Returns the cell codec classname 450 */ 451 public String getCodecClsName() { 452 return codecClsName; 453 } 454 455 public long getPosition() throws IOException { 456 return inputStream != null ? inputStream.getPos() : -1; 457 } 458 459 public long trailerSize() { 460 if (trailerPresent) { 461 // sizeof PB_WAL_COMPLETE_MAGIC + sizeof trailerSize + trailer 462 final long calculatedSize = 463 (long) PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT + trailer.getSerializedSize(); 464 final long expectedSize = fileLength - walEditsStopOffset; 465 if (expectedSize != calculatedSize) { 466 LOG.warn("After parsing the trailer, we expect the total footer to be {} bytes, but we " 467 + "calculate it as being {}", expectedSize, calculatedSize); 468 } 469 return expectedSize; 470 } else { 471 return -1L; 472 } 473 } 474 475 protected final String getPositionQuietly() { 476 try { 477 long pos = getPosition(); 478 return pos >= 0 ? Long.toString(pos) : "<unknown>"; 479 } catch (Exception e) { 480 LOG.warn("failed to get position, ignoring", e); 481 return "<unknown>"; 482 } 483 } 484 485 protected final IOException extractHiddenEof(Exception ex) { 486 // There are two problems we are dealing with here. Hadoop stream throws generic exception 487 // for EOF, not EOFException; and scanner further hides it inside RuntimeException. 488 IOException ioEx = null; 489 if (ex instanceof EOFException) { 490 return (EOFException) ex; 491 } else if (ex instanceof IOException) { 492 ioEx = (IOException) ex; 493 } else if ( 494 ex instanceof RuntimeException && ex.getCause() != null 495 && ex.getCause() instanceof IOException 496 ) { 497 ioEx = (IOException) ex.getCause(); 498 } 499 if ((ioEx != null) && (ioEx.getMessage() != null)) { 500 if (ioEx.getMessage().contains("EOF")) { 501 return ioEx; 502 } 503 return null; 504 } 505 return null; 506 } 507 508 /** 509 * This is used to determine whether we have already reached the WALTrailer. As the size and magic 510 * are at the end of the WAL file, it is possible that these two options are missing while 511 * writing, so we will consider there is no trailer. And when we actually reach the WALTrailer, we 512 * will try to decode it as WALKey and we will fail but the error could be varied as it is parsing 513 * WALTrailer actually. 514 * @return whether this is a WALTrailer and we should throw EOF to upper layer the file is done 515 */ 516 protected final boolean isWALTrailer(long startPosition) throws IOException { 517 // We have nothing in the WALTrailer PB message now so its size is just an int length size and a 518 // magic at the end 519 int trailerSize = PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT; 520 if (fileLength - startPosition >= trailerSize) { 521 // We still have more than trailerSize bytes before reaching the EOF so this is not a trailer. 522 // We also test for == here because if this is a valid trailer, we can read it while opening 523 // the reader, so we should not reach here 524 return false; 525 } 526 inputStream.seek(startPosition); 527 for (int i = 0; i < 4; i++) { 528 int r = inputStream.read(); 529 if (r == -1) { 530 // we have reached EOF while reading the length, and all bytes read are 0, so we assume this 531 // is a partial trailer 532 return true; 533 } 534 if (r != 0) { 535 // the length is not 0, should not be a trailer 536 return false; 537 } 538 } 539 for (int i = 0; i < PB_WAL_COMPLETE_MAGIC.length; i++) { 540 int r = inputStream.read(); 541 if (r == -1) { 542 // we have reached EOF while reading the magic, and all bytes read are matched, so we assume 543 // this is a partial trailer 544 return true; 545 } 546 if (r != (PB_WAL_COMPLETE_MAGIC[i] & 0xFF)) { 547 // does not match magic, should not be a trailer 548 return false; 549 } 550 } 551 // in fact, we should not reach here, as this means the trailer bytes are all matched and 552 // complete, then we should not call this method... 553 return true; 554 } 555 556 @Override 557 public void close() { 558 if (inputStream != null) { 559 IOUtils.closeQuietly(inputStream); 560 inputStream = null; 561 } 562 } 563}