001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.io.hadoopbackport;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InterruptedIOException;
024import java.util.concurrent.TimeUnit;
025
026import org.apache.hadoop.fs.PositionedReadable;
027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
028import org.apache.yetus.audience.InterfaceAudience;
029
030/**
031 * The ThrottleInputStream provides bandwidth throttling on a specified
032 * InputStream. It is implemented as a wrapper on top of another InputStream
033 * instance.
034 * The throttling works by examining the number of bytes read from the underlying
035 * InputStream from the beginning, and sleep()ing for a time interval if
036 * the byte-transfer is found exceed the specified tolerable maximum.
037 * (Thus, while the read-rate might exceed the maximum for a given short interval,
038 * the average tends towards the specified maximum, overall.)
039 */
040@InterfaceAudience.Private
041public class ThrottledInputStream extends InputStream {
042
043  private final InputStream rawStream;
044  private final long maxBytesPerSec;
045  private final long startTime = System.currentTimeMillis();
046
047  private long bytesRead = 0;
048  private long totalSleepTime = 0;
049
050  public ThrottledInputStream(InputStream rawStream) {
051    this(rawStream, Long.MAX_VALUE);
052  }
053
054  public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
055    assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid";
056    this.rawStream = rawStream;
057    this.maxBytesPerSec = maxBytesPerSec;
058  }
059
060  @Override
061  public void close() throws IOException {
062    rawStream.close();
063  }
064
065  /** {@inheritDoc} */
066  @Override
067  public int read() throws IOException {
068    throttle();
069    int data = rawStream.read();
070    if (data != -1) {
071      bytesRead++;
072    }
073    return data;
074  }
075
076  /** {@inheritDoc} */
077  @Override
078  public int read(byte[] b) throws IOException {
079    throttle();
080    int readLen = rawStream.read(b);
081    if (readLen != -1) {
082      bytesRead += readLen;
083    }
084    return readLen;
085  }
086
087  /** {@inheritDoc} */
088  @Override
089  public int read(byte[] b, int off, int len) throws IOException {
090    throttle();
091    int readLen = rawStream.read(b, off, len);
092    if (readLen != -1) {
093      bytesRead += readLen;
094    }
095    return readLen;
096  }
097
098  /**
099   * Read bytes starting from the specified position. This requires rawStream is
100   * an instance of {@link PositionedReadable}.
101   * @param position
102   * @param buffer
103   * @param offset
104   * @param length
105   * @return the number of bytes read
106   */
107  public int read(long position, byte[] buffer, int offset, int length)
108      throws IOException {
109    if (!(rawStream instanceof PositionedReadable)) {
110      throw new UnsupportedOperationException(
111        "positioned read is not supported by the internal stream");
112    }
113    throttle();
114    int readLen = ((PositionedReadable) rawStream).read(position, buffer,
115      offset, length);
116    if (readLen != -1) {
117      bytesRead += readLen;
118    }
119    return readLen;
120  }
121
122  private long calSleepTimeMs() {
123    return calSleepTimeMs(bytesRead, maxBytesPerSec,
124      EnvironmentEdgeManager.currentTime() - startTime);
125  }
126
127  static long calSleepTimeMs(long bytesRead, long maxBytesPerSec, long elapsed) {
128    assert elapsed > 0 : "The elapsed time should be greater than zero";
129    if (bytesRead <= 0 || maxBytesPerSec <= 0) {
130      return 0;
131    }
132    // We use this class to load the single source file, so the bytesRead
133    // and maxBytesPerSec aren't greater than Double.MAX_VALUE.
134    // We can get the precise sleep time by using the double value.
135    long rval = (long) ((((double) bytesRead) / ((double) maxBytesPerSec)) * 1000 - elapsed);
136    if (rval <= 0) {
137      return 0;
138    } else {
139      return rval;
140    }
141  }
142
143  private void throttle() throws InterruptedIOException {
144    long sleepTime = calSleepTimeMs();
145    totalSleepTime += sleepTime;
146    try {
147      TimeUnit.MILLISECONDS.sleep(sleepTime);
148    } catch (InterruptedException e) {
149      throw new InterruptedIOException("Thread aborted");
150    }
151  }
152
153  /**
154   * Getter for the number of bytes read from this stream, since creation.
155   * @return The number of bytes.
156   */
157  public long getTotalBytesRead() {
158    return bytesRead;
159  }
160
161  /**
162   * Getter for the read-rate from this stream, since creation.
163   * Calculated as bytesRead/elapsedTimeSinceStart.
164   * @return Read rate, in bytes/sec.
165   */
166  public long getBytesPerSec() {
167    long elapsed = (System.currentTimeMillis() - startTime) / 1000;
168    if (elapsed == 0) {
169      return bytesRead;
170    } else {
171      return bytesRead / elapsed;
172    }
173  }
174
175  /**
176   * Getter the total time spent in sleep.
177   * @return Number of milliseconds spent in sleep.
178   */
179  public long getTotalSleepTime() {
180    return totalSleepTime;
181  }
182
183  /** {@inheritDoc} */
184  @Override
185  public String toString() {
186    return "ThrottledInputStream{" +
187        "bytesRead=" + bytesRead +
188        ", maxBytesPerSec=" + maxBytesPerSec +
189        ", bytesPerSec=" + getBytesPerSec() +
190        ", totalSleepTime=" + totalSleepTime +
191        '}';
192  }
193}