001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.IOException;
021import java.io.InputStream;
022import org.apache.commons.io.IOUtils;
023import org.apache.yetus.audience.InterfaceAudience;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027/**
028 * This class is only used by WAL ValueCompressor for decompression.
029 * <p>
030 * <strong>WARNING: </strong>The implementation is very tricky and does not follow typical
031 * InputStream pattern, so do not use it in any other places.
032 */
033@InterfaceAudience.Private
034class WALDecompressionBoundedDelegatingInputStream extends InputStream {
035
036  private static final Logger LOG =
037    LoggerFactory.getLogger(WALDecompressionBoundedDelegatingInputStream.class);
038
039  private InputStream in;
040
041  private long pos;
042
043  private long limit;
044
045  public void reset(InputStream in, long limit) {
046    this.in = in;
047    this.limit = limit;
048    this.pos = 0;
049  }
050
051  @Override
052  public int read() throws IOException {
053    if (pos >= limit) {
054      return -1;
055    }
056    int result = in.read();
057    if (result < 0) {
058      return -1;
059    }
060    pos++;
061    return result;
062  }
063
064  @Override
065  public int read(byte[] b, int off, int len) throws IOException {
066    if (pos >= limit) {
067      return -1;
068    }
069    int toRead = (int) Math.min(len, limit - pos);
070    int readBytes = IOUtils.read(in, b, off, toRead);
071    // increase pos by however many we actually read
072    pos += readBytes;
073
074    if (readBytes != toRead) {
075      // This is trick here, we will always try to read enough bytes to fill the buffer passed in,
076      // or we reach the end of this compression block, if there are not enough bytes, we just
077      // return -1 to let the upper layer fail with EOF
078      // In WAL value decompression this is OK as if we can not read all the data, we will finally
079      // get an EOF somewhere
080      LOG.debug("Got EOF while we want to read {} bytes from stream, but only read {}", toRead,
081        readBytes);
082      return -1;
083    }
084    return toRead;
085  }
086
087  @Override
088  public long skip(final long len) throws IOException {
089    long skipped = in.skip(Math.min(len, limit - pos));
090    pos += skipped;
091    return skipped;
092  }
093
094  @Override
095  public int available() throws IOException {
096    if (pos >= limit) {
097      return 0;
098    }
099    // Do not call the delegate's available() method. Data in a bounded input stream is assumed
100    // available up to the limit and that is the contract we have with our callers. Regardless
101    // of what we do here, read() and skip() will behave as expected when EOF is encountered if
102    // the underlying stream is closed early or otherwise could not provide enough bytes.
103    // Note: This class is used to supply buffers to compression codecs during WAL tailing and
104    // successful decompression depends on this behavior.
105    return (int) (limit - pos);
106  }
107}