001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import java.io.IOException; 021import java.io.InputStream; 022import org.apache.yetus.audience.InterfaceAudience; 023 024/** 025 * This is a stream that will only supply bytes from its delegate up to a certain limit. When there 026 * is an attempt to set the position beyond that it will signal that the input is finished. 027 */ 028@InterfaceAudience.Private 029public class BoundedDelegatingInputStream extends DelegatingInputStream { 030 031 protected long limit; 032 protected long pos; 033 034 public BoundedDelegatingInputStream(InputStream in, long limit) { 035 super(in); 036 this.limit = limit; 037 this.pos = 0; 038 } 039 040 public void setDelegate(InputStream in, long limit) { 041 this.in = in; 042 this.limit = limit; 043 this.pos = 0; 044 } 045 046 /** 047 * Call the delegate's {@code read()} method if the current position is less than the limit. 048 * @return the byte read or -1 if the end of stream or the limit has been reached. 049 */ 050 @Override 051 public int read() throws IOException { 052 if (pos >= limit) { 053 return -1; 054 } 055 int result = in.read(); 056 pos++; 057 return result; 058 } 059 060 /** 061 * Call the delegate's {@code read(byte[], int, int)} method if the current position is less than 062 * the limit. 063 * @param b read buffer 064 * @param off Start offset 065 * @param len The number of bytes to read 066 * @return the number of bytes read or -1 if the end of stream or the limit has been reached. 067 */ 068 @Override 069 public int read(final byte[] b, final int off, final int len) throws IOException { 070 if (pos >= limit) { 071 return -1; 072 } 073 long readLen = Math.min(len, limit - pos); 074 int read = in.read(b, off, (int) readLen); 075 if (read < 0) { 076 return -1; 077 } 078 pos += read; 079 return read; 080 } 081 082 /** 083 * Call the delegate's {@code skip(long)} method. 084 * @param len the number of bytes to skip 085 * @return the actual number of bytes skipped 086 */ 087 @Override 088 public long skip(final long len) throws IOException { 089 long skipped = in.skip(Math.min(len, limit - pos)); 090 pos += skipped; 091 return skipped; 092 } 093 094 /** 095 * @return the remaining bytes within the bound if the current position is less than the limit, or 096 * 0 otherwise. 097 */ 098 @Override 099 public int available() throws IOException { 100 if (pos >= limit) { 101 return 0; 102 } 103 // Do not call the delegate's available() method. Data in a bounded input stream is assumed 104 // available up to the limit and that is the contract we have with our callers. Regardless 105 // of what we do here, read() and skip() will behave as expected when EOF is encountered if 106 // the underlying stream is closed early or otherwise could not provide enough bytes. 107 // Note: This class is used to supply buffers to compression codecs during WAL tailing and 108 // successful decompression depends on this behavior. 109 return (int) (limit - pos); 110 } 111 112}