001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.Closeable;
021import java.io.EOFException;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InputStream;
025import java.nio.ByteBuffer;
026import java.security.Key;
027import java.security.KeyException;
028import java.util.Arrays;
029import java.util.List;
030import org.apache.commons.io.IOUtils;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FSDataInputStream;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.codec.Codec;
038import org.apache.hadoop.hbase.io.compress.Compression;
039import org.apache.hadoop.hbase.io.crypto.Cipher;
040import org.apache.hadoop.hbase.io.crypto.Decryptor;
041import org.apache.hadoop.hbase.io.crypto.Encryption;
042import org.apache.hadoop.hbase.io.util.LRUDictionary;
043import org.apache.hadoop.hbase.security.EncryptionUtil;
044import org.apache.hadoop.hbase.security.User;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.CommonFSUtils;
047import org.apache.hadoop.hbase.util.EncryptionTest;
048import org.apache.hadoop.hbase.util.Pair;
049import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
050import org.apache.hadoop.ipc.RemoteException;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
056import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
057import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
058
059import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
062
063/**
064 * Base class for reading protobuf based wal reader
065 */
066@InterfaceAudience.Private
067public abstract class AbstractProtobufWALReader
068  implements AbstractFSWALProvider.Initializer, Closeable {
069
070  private static final Logger LOG = LoggerFactory.getLogger(AbstractProtobufWALReader.class);
071
072  // public for WALFactory until we move everything to o.a.h.h.wal
073  public static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
074
075  // public for TestWALSplit
076  public static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
077
078  /**
079   * Configuration name of WAL Trailer's warning size. If a waltrailer's size is greater than the
080   * configured size, providers should log a warning. e.g. this is used with Protobuf reader/writer.
081   */
082  static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
083  static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB
084
085  private static final List<String> WRITER_CLS_NAMES = ImmutableList.of(
086    ProtobufLogWriter.class.getSimpleName(), AsyncProtobufLogWriter.class.getSimpleName(),
087    "SecureProtobufLogWriter", "SecureAsyncProtobufLogWriter");
088
089  protected Configuration conf;
090
091  protected FileSystem fs;
092
093  protected Path path;
094
095  protected long fileLength;
096
097  protected FSDataInputStream inputStream;
098
099  protected CompressionContext compressionCtx;
100  protected boolean hasCompression = false;
101  protected boolean hasTagCompression = false;
102  protected boolean hasValueCompression = false;
103  protected Compression.Algorithm valueCompressionType;
104
105  protected Codec.Decoder cellDecoder;
106  protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
107
108  protected long walEditsStopOffset;
109  protected boolean trailerPresent;
110  protected WALTrailer trailer;
111  // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
112  // than this size, it is written/read respectively, with a WARN message in the log.
113  protected int trailerWarnSize;
114
115  // cell codec classname
116  protected String codecClsName;
117
118  protected Decryptor decryptor;
119
120  /**
121   * Get or create the input stream used by cell decoder.
122   * <p/>
123   * For implementing replication, we may need to limit the bytes we can read, so here we provide a
124   * method so subclasses can wrap the original input stream.
125   */
126  protected abstract InputStream getCellCodecInputStream(FSDataInputStream stream);
127
128  /**
129   * Skip to the given position.
130   */
131  protected abstract void skipTo(long position) throws IOException;
132
133  @Override
134  public void init(FileSystem fs, Path path, Configuration conf, long startPosition)
135    throws IOException {
136    this.conf = conf;
137    this.path = path;
138    this.fs = fs;
139    this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
140
141    Pair<FSDataInputStream, FileStatus> pair = open();
142    FSDataInputStream stream = pair.getFirst();
143    FileStatus stat = pair.getSecond();
144    boolean initSucceeded = false;
145    try {
146      // read the header
147      WALProtos.WALHeader header = readHeader(stream);
148      // initialize metadata and fields
149      initDecryptor(header);
150      initCompression(header);
151      initWALCellCodec(header, getCellCodecInputStream(stream));
152
153      // read trailer if available
154      readTrailer(stream, stat);
155
156      // this is intentional as we do not want the above methods to use the inputStream field. For
157      // implementation tailing reader, we need to wrap the input stream when creating cell decoder,
158      // so we need to make sure in the above methods we do not accidentally use the stored
159      // inputStream directly and cause trouble. If a method needs to use an input stream, we just
160      // pass the input stream in, like readHeader and readTrailer.
161      this.inputStream = stream;
162
163      // seek to the given position if it is not -1
164      if (startPosition >= 0 && startPosition != inputStream.getPos()) {
165        if (compressionCtx != null) {
166          // skip to the position, as we need to construct the compression dictionary
167          skipTo(startPosition);
168        } else {
169          // just seek to the position
170          stream.seek(startPosition);
171        }
172      }
173      initSucceeded = true;
174    } finally {
175      if (!initSucceeded) {
176        Closeables.close(stream, initSucceeded);
177        inputStream = null;
178      }
179    }
180  }
181
182  private Pair<FSDataInputStream, FileStatus> openArchivedWAL() throws IOException {
183    Path archivedWAL = AbstractFSWALProvider.findArchivedLog(path, conf);
184    if (archivedWAL != null) {
185      // try open from oldWAL dir
186      return Pair.newPair(fs.open(archivedWAL), fs.getFileStatus(archivedWAL));
187    } else {
188      return null;
189    }
190  }
191
192  protected final Pair<FSDataInputStream, FileStatus> open() throws IOException {
193    try {
194      return Pair.newPair(fs.open(path), fs.getFileStatus(path));
195    } catch (FileNotFoundException e) {
196      Pair<FSDataInputStream, FileStatus> pair = openArchivedWAL();
197      if (pair != null) {
198        return pair;
199      } else {
200        throw e;
201      }
202    } catch (RemoteException re) {
203      IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
204      if (!(ioe instanceof FileNotFoundException)) {
205        throw ioe;
206      }
207      Pair<FSDataInputStream, FileStatus> pair = openArchivedWAL();
208      if (pair != null) {
209        return pair;
210      } else {
211        throw ioe;
212      }
213    }
214  }
215
216  protected final WALProtos.WALHeader readHeader(FSDataInputStream stream) throws IOException {
217    byte[] magic = new byte[PB_WAL_MAGIC.length];
218    try {
219      stream.readFully(magic);
220    } catch (EOFException e) {
221      throw new WALHeaderEOFException("EOF while reading PB WAL magic", e);
222    }
223    if (!Arrays.equals(PB_WAL_MAGIC, magic)) {
224      throw new IOException("Invalid PB WAL magic " + Bytes.toStringBinary(magic) + ", expected "
225        + Bytes.toStringBinary(PB_WAL_MAGIC));
226    }
227    WALProtos.WALHeader header;
228    try {
229      header = ProtobufUtil.parseDelimitedFrom(stream, WALProtos.WALHeader.parser());
230    } catch (InvalidProtocolBufferException e) {
231      if (ProtobufUtil.isEOF(e)) {
232        throw new WALHeaderEOFException("EOF while reading PB header", e);
233      } else {
234        throw e;
235      }
236    } catch (EOFException e) {
237      throw new WALHeaderEOFException("EOF while reading PB header", e);
238    }
239    if (header == null) {
240      throw new WALHeaderEOFException("EOF while reading PB header");
241    }
242    if (header.hasWriterClsName() && !getWriterClsNames().contains(header.getWriterClsName())) {
243      throw new IOException("Got unknown writer class: " + header.getWriterClsName());
244    }
245    return header;
246  }
247
248  private void initDecryptor(WALProtos.WALHeader header) throws IOException {
249    if (!header.hasEncryptionKey()) {
250      return;
251    }
252    EncryptionTest.testKeyProvider(conf);
253    EncryptionTest.testCipherProvider(conf);
254
255    // Retrieve a usable key
256    byte[] keyBytes = header.getEncryptionKey().toByteArray();
257    Key key = null;
258    String walKeyName = conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY);
259    // First try the WAL key, if one is configured
260    if (walKeyName != null) {
261      try {
262        key = EncryptionUtil.unwrapWALKey(conf, walKeyName, keyBytes);
263      } catch (KeyException e) {
264        LOG.debug("Unable to unwrap key with WAL key '{}'", walKeyName, e);
265        key = null;
266      }
267    }
268    if (key == null) {
269      String masterKeyName =
270        conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName());
271      try {
272        // Then, try the cluster master key
273        key = EncryptionUtil.unwrapWALKey(conf, masterKeyName, keyBytes);
274      } catch (KeyException e) {
275        // If the current master key fails to unwrap, try the alternate, if
276        // one is configured
277        LOG.debug("Unable to unwrap key with current master key '{}'", masterKeyName, e);
278        String alternateKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
279        if (alternateKeyName != null) {
280          try {
281            key = EncryptionUtil.unwrapWALKey(conf, alternateKeyName, keyBytes);
282          } catch (KeyException ex) {
283            throw new IOException(ex);
284          }
285        } else {
286          throw new IOException(e);
287        }
288      }
289    }
290
291    // Use the algorithm the key wants
292    Cipher cipher = Encryption.getCipher(conf, key.getAlgorithm());
293    if (cipher == null) {
294      throw new IOException("Cipher '" + key.getAlgorithm() + "' is not available");
295    }
296
297    // Set up the decryptor for this WAL
298
299    decryptor = cipher.getDecryptor();
300    decryptor.setKey(key);
301
302    LOG.debug("Initialized secure protobuf WAL: cipher={}", cipher.getName());
303  }
304
305  private void initCompression(WALProtos.WALHeader header) throws IOException {
306    this.hasCompression = header.hasHasCompression() && header.getHasCompression();
307    if (!hasCompression) {
308      return;
309    }
310    this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
311    this.hasValueCompression = header.hasHasValueCompression() && header.getHasValueCompression();
312    if (header.hasValueCompressionAlgorithm()) {
313      try {
314        this.valueCompressionType =
315          Compression.Algorithm.values()[header.getValueCompressionAlgorithm()];
316      } catch (ArrayIndexOutOfBoundsException e) {
317        throw new IOException("Invalid compression type", e);
318      }
319    }
320    if (LOG.isDebugEnabled()) {
321      LOG.debug(
322        "Initializing compression context for {}: isRecoveredEdits={}"
323          + ", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}",
324        path, CommonFSUtils.isRecoveredEdits(path), hasTagCompression, hasValueCompression,
325        valueCompressionType);
326    }
327    try {
328      compressionCtx =
329        new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path),
330          hasTagCompression, hasValueCompression, valueCompressionType);
331    } catch (Exception e) {
332      throw new IOException("Failed to initialize CompressionContext", e);
333    }
334  }
335
336  private WALCellCodec getCodec(Configuration conf, String cellCodecClsName,
337    CompressionContext compressionContext) throws IOException {
338    return WALCellCodec.create(conf, cellCodecClsName, compressionContext);
339  }
340
341  protected final void initWALCellCodec(WALProtos.WALHeader header, InputStream inputStream)
342    throws IOException {
343    String cellCodecClsName = header.hasCellCodecClsName() ? header.getCellCodecClsName() : null;
344    if (decryptor != null && SecureWALCellCodec.class.getName().equals(cellCodecClsName)) {
345      WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, decryptor);
346      this.cellDecoder = codec.getDecoder(inputStream);
347      // We do not support compression with WAL encryption
348      this.compressionCtx = null;
349      this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();
350      this.hasCompression = false;
351      this.hasTagCompression = false;
352      this.hasValueCompression = false;
353    } else {
354      WALCellCodec codec = getCodec(conf, cellCodecClsName, compressionCtx);
355      this.cellDecoder = codec.getDecoder(inputStream);
356      if (this.hasCompression) {
357        this.byteStringUncompressor = codec.getByteStringUncompressor();
358      } else {
359        this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();
360      }
361    }
362    this.codecClsName = cellCodecClsName;
363  }
364
365  protected final void readTrailer(FSDataInputStream stream, FileStatus stat) throws IOException {
366    this.fileLength = stat.getLen();
367    this.walEditsStopOffset = this.fileLength;
368    long currentPos = stream.getPos();
369    // we will reset walEditsStopOffset if trailer is available
370    trailerPresent = setTrailerIfPresent(stream);
371    if (currentPos != stream.getPos()) {
372      // seek back
373      stream.seek(currentPos);
374    }
375  }
376
377  /**
378   * To check whether a trailer is present in a WAL, it seeks to position (fileLength -
379   * PB_WAL_COMPLETE_MAGIC.size() - Bytes.SIZEOF_INT). It reads the int value to know the size of
380   * the trailer, and checks whether the trailer is present at the end or not by comparing the last
381   * PB_WAL_COMPLETE_MAGIC.size() bytes. In case trailer is not present, it returns false;
382   * otherwise, sets the trailer and sets this.walEditsStopOffset variable up to the point just
383   * before the trailer.
384   * <p/>
385   * The trailer is ignored in case:
386   * <ul>
387   * <li>fileLength is 0 or not correct (when file is under recovery, etc).
388   * <li>the trailer size is negative.
389   * </ul>
390   * In case the trailer size > this.trailerMaxSize, it is read after a WARN message.
391   * @return true if a valid trailer is present
392   */
393  private boolean setTrailerIfPresent(FSDataInputStream stream) throws IOException {
394    try {
395      long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT);
396      if (trailerSizeOffset <= 0) {
397        // no trailer possible.
398        return false;
399      }
400      stream.seek(trailerSizeOffset);
401      // read the int as trailer size.
402      int trailerSize = stream.readInt();
403      ByteBuffer buf = ByteBuffer.allocate(PB_WAL_COMPLETE_MAGIC.length);
404      stream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
405      if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
406        LOG.trace("No trailer found.");
407        return false;
408      }
409      if (trailerSize < 0) {
410        LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer");
411        return false;
412      } else if (trailerSize > this.trailerWarnSize) {
413        // continue reading after warning the user.
414        LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : "
415          + trailerSize + " > " + this.trailerWarnSize);
416      }
417      // seek to the position where trailer starts.
418      long positionOfTrailer = trailerSizeOffset - trailerSize;
419      stream.seek(positionOfTrailer);
420      // read the trailer.
421      buf = ByteBuffer.allocate(trailerSize);// for trailer.
422      stream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
423      trailer = WALTrailer.parseFrom(buf.array());
424      this.walEditsStopOffset = positionOfTrailer;
425      return true;
426    } catch (IOException ioe) {
427      LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe);
428    }
429    return false;
430  }
431
432  protected final boolean reachWALEditsStopOffset(long pos) {
433    if (trailerPresent && pos > 0 && pos == walEditsStopOffset) {
434      LOG.trace("Reached end of expected edits area at offset {}", pos);
435      return true;
436    } else {
437      return false;
438    }
439  }
440
441  /**
442   * Returns names of the accepted writer classes
443   */
444  public List<String> getWriterClsNames() {
445    return WRITER_CLS_NAMES;
446  }
447
448  /**
449   * Returns the cell codec classname
450   */
451  public String getCodecClsName() {
452    return codecClsName;
453  }
454
455  public long getPosition() throws IOException {
456    return inputStream != null ? inputStream.getPos() : -1;
457  }
458
459  public long trailerSize() {
460    if (trailerPresent) {
461      // sizeof PB_WAL_COMPLETE_MAGIC + sizeof trailerSize + trailer
462      final long calculatedSize =
463        (long) PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT + trailer.getSerializedSize();
464      final long expectedSize = fileLength - walEditsStopOffset;
465      if (expectedSize != calculatedSize) {
466        LOG.warn("After parsing the trailer, we expect the total footer to be {} bytes, but we "
467          + "calculate it as being {}", expectedSize, calculatedSize);
468      }
469      return expectedSize;
470    } else {
471      return -1L;
472    }
473  }
474
475  protected final String getPositionQuietly() {
476    try {
477      long pos = getPosition();
478      return pos >= 0 ? Long.toString(pos) : "<unknown>";
479    } catch (Exception e) {
480      LOG.warn("failed to get position, ignoring", e);
481      return "<unknown>";
482    }
483  }
484
485  protected final IOException extractHiddenEof(Exception ex) {
486    // There are two problems we are dealing with here. Hadoop stream throws generic exception
487    // for EOF, not EOFException; and scanner further hides it inside RuntimeException.
488    IOException ioEx = null;
489    if (ex instanceof EOFException) {
490      return (EOFException) ex;
491    } else if (ex instanceof IOException) {
492      ioEx = (IOException) ex;
493    } else if (
494      ex instanceof RuntimeException && ex.getCause() != null
495        && ex.getCause() instanceof IOException
496    ) {
497      ioEx = (IOException) ex.getCause();
498    }
499    if ((ioEx != null) && (ioEx.getMessage() != null)) {
500      if (ioEx.getMessage().contains("EOF")) {
501        return ioEx;
502      }
503      return null;
504    }
505    return null;
506  }
507
508  /**
509   * This is used to determine whether we have already reached the WALTrailer. As the size and magic
510   * are at the end of the WAL file, it is possible that these two options are missing while
511   * writing, so we will consider there is no trailer. And when we actually reach the WALTrailer, we
512   * will try to decode it as WALKey and we will fail but the error could be varied as it is parsing
513   * WALTrailer actually.
514   * @return whether this is a WALTrailer and we should throw EOF to upper layer the file is done
515   */
516  protected final boolean isWALTrailer(long startPosition) throws IOException {
517    // We have nothing in the WALTrailer PB message now so its size is just an int length size and a
518    // magic at the end
519    int trailerSize = PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT;
520    if (fileLength - startPosition >= trailerSize) {
521      // We still have more than trailerSize bytes before reaching the EOF so this is not a trailer.
522      // We also test for == here because if this is a valid trailer, we can read it while opening
523      // the reader, so we should not reach here
524      return false;
525    }
526    inputStream.seek(startPosition);
527    for (int i = 0; i < 4; i++) {
528      int r = inputStream.read();
529      if (r == -1) {
530        // we have reached EOF while reading the length, and all bytes read are 0, so we assume this
531        // is a partial trailer
532        return true;
533      }
534      if (r != 0) {
535        // the length is not 0, should not be a trailer
536        return false;
537      }
538    }
539    for (int i = 0; i < PB_WAL_COMPLETE_MAGIC.length; i++) {
540      int r = inputStream.read();
541      if (r == -1) {
542        // we have reached EOF while reading the magic, and all bytes read are matched, so we assume
543        // this is a partial trailer
544        return true;
545      }
546      if (r != (PB_WAL_COMPLETE_MAGIC[i] & 0xFF)) {
547        // does not match magic, should not be a trailer
548        return false;
549      }
550    }
551    // in fact, we should not reach here, as this means the trailer bytes are all matched and
552    // complete, then we should not call this method...
553    return true;
554  }
555
556  @Override
557  public void close() {
558    if (inputStream != null) {
559      IOUtils.closeQuietly(inputStream);
560      inputStream = null;
561    }
562  }
563}