001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import java.io.IOException;
021import java.io.InputStream;
022import org.apache.yetus.audience.InterfaceAudience;
023
024/**
025 * This is a stream that will only supply bytes from its delegate up to a certain limit. When there
026 * is an attempt to set the position beyond that it will signal that the input is finished.
027 */
028@InterfaceAudience.Private
029public class BoundedDelegatingInputStream extends DelegatingInputStream {
030
031  protected long limit;
032  protected long pos;
033
034  public BoundedDelegatingInputStream(InputStream in, long limit) {
035    super(in);
036    this.limit = limit;
037    this.pos = 0;
038  }
039
040  public void setDelegate(InputStream in, long limit) {
041    this.in = in;
042    this.limit = limit;
043    this.pos = 0;
044  }
045
046  /**
047   * Call the delegate's {@code read()} method if the current position is less than the limit.
048   * @return the byte read or -1 if the end of stream or the limit has been reached.
049   */
050  @Override
051  public int read() throws IOException {
052    if (pos >= limit) {
053      return -1;
054    }
055    int result = in.read();
056    pos++;
057    return result;
058  }
059
060  /**
061   * Call the delegate's {@code read(byte[], int, int)} method if the current position is less than
062   * the limit.
063   * @param b   read buffer
064   * @param off Start offset
065   * @param len The number of bytes to read
066   * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
067   */
068  @Override
069  public int read(final byte[] b, final int off, final int len) throws IOException {
070    if (pos >= limit) {
071      return -1;
072    }
073    long readLen = Math.min(len, limit - pos);
074    int read = in.read(b, off, (int) readLen);
075    if (read < 0) {
076      return -1;
077    }
078    pos += read;
079    return read;
080  }
081
082  /**
083   * Call the delegate's {@code skip(long)} method.
084   * @param len the number of bytes to skip
085   * @return the actual number of bytes skipped
086   */
087  @Override
088  public long skip(final long len) throws IOException {
089    long skipped = in.skip(Math.min(len, limit - pos));
090    pos += skipped;
091    return skipped;
092  }
093
094  /**
095   * @return the remaining bytes within the bound if the current position is less than the limit, or
096   *         0 otherwise.
097   */
098  @Override
099  public int available() throws IOException {
100    if (pos >= limit) {
101      return 0;
102    }
103    // Do not call the delegate's available() method. Data in a bounded input stream is assumed
104    // available up to the limit and that is the contract we have with our callers. Regardless
105    // of what we do here, read() and skip() will behave as expected when EOF is encountered if
106    // the underlying stream is closed early or otherwise could not provide enough bytes.
107    // Note: This class is used to supply buffers to compression codecs during WAL tailing and
108    // successful decompression depends on this behavior.
109    return (int) (limit - pos);
110  }
111
112}