001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hadoopbackport;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.InterruptedIOException;
023import java.util.concurrent.TimeUnit;
024import org.apache.hadoop.fs.PositionedReadable;
025import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
026import org.apache.yetus.audience.InterfaceAudience;
027
028/**
029 * The ThrottleInputStream provides bandwidth throttling on a specified InputStream. It is
030 * implemented as a wrapper on top of another InputStream instance. The throttling works by
031 * examining the number of bytes read from the underlying InputStream from the beginning, and
032 * sleep()ing for a time interval if the byte-transfer is found exceed the specified tolerable
033 * maximum. (Thus, while the read-rate might exceed the maximum for a given short interval, the
034 * average tends towards the specified maximum, overall.)
035 */
036@InterfaceAudience.Private
037public class ThrottledInputStream extends InputStream {
038
039  private final InputStream rawStream;
040  private final long maxBytesPerSec;
041  private final long startTime = EnvironmentEdgeManager.currentTime();
042
043  private long bytesRead = 0;
044  private long totalSleepTime = 0;
045
046  public ThrottledInputStream(InputStream rawStream) {
047    this(rawStream, Long.MAX_VALUE);
048  }
049
050  public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
051    assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid";
052    this.rawStream = rawStream;
053    this.maxBytesPerSec = maxBytesPerSec;
054  }
055
056  @Override
057  public void close() throws IOException {
058    rawStream.close();
059  }
060
061  /** {@inheritDoc} */
062  @Override
063  public int read() throws IOException {
064    throttle();
065    int data = rawStream.read();
066    if (data != -1) {
067      bytesRead++;
068    }
069    return data;
070  }
071
072  /** {@inheritDoc} */
073  @Override
074  public int read(byte[] b) throws IOException {
075    throttle();
076    int readLen = rawStream.read(b);
077    if (readLen != -1) {
078      bytesRead += readLen;
079    }
080    return readLen;
081  }
082
083  /** {@inheritDoc} */
084  @Override
085  public int read(byte[] b, int off, int len) throws IOException {
086    throttle();
087    int readLen = rawStream.read(b, off, len);
088    if (readLen != -1) {
089      bytesRead += readLen;
090    }
091    return readLen;
092  }
093
094  /**
095   * Read bytes starting from the specified position. This requires rawStream is an instance of
096   * {@link PositionedReadable}.
097   * @return the number of bytes read
098   */
099  public int read(long position, byte[] buffer, int offset, int length) throws IOException {
100    if (!(rawStream instanceof PositionedReadable)) {
101      throw new UnsupportedOperationException(
102        "positioned read is not supported by the internal stream");
103    }
104    throttle();
105    int readLen = ((PositionedReadable) rawStream).read(position, buffer, offset, length);
106    if (readLen != -1) {
107      bytesRead += readLen;
108    }
109    return readLen;
110  }
111
112  private long calSleepTimeMs() {
113    return calSleepTimeMs(bytesRead, maxBytesPerSec,
114      EnvironmentEdgeManager.currentTime() - startTime);
115  }
116
117  static long calSleepTimeMs(long bytesRead, long maxBytesPerSec, long elapsed) {
118    assert elapsed > 0 : "The elapsed time should be greater than zero";
119    if (bytesRead <= 0 || maxBytesPerSec <= 0) {
120      return 0;
121    }
122    // We use this class to load the single source file, so the bytesRead
123    // and maxBytesPerSec aren't greater than Double.MAX_VALUE.
124    // We can get the precise sleep time by using the double value.
125    long rval = (long) ((((double) bytesRead) / ((double) maxBytesPerSec)) * 1000 - elapsed);
126    if (rval <= 0) {
127      return 0;
128    } else {
129      return rval;
130    }
131  }
132
133  private void throttle() throws InterruptedIOException {
134    long sleepTime = calSleepTimeMs();
135    totalSleepTime += sleepTime;
136    try {
137      TimeUnit.MILLISECONDS.sleep(sleepTime);
138    } catch (InterruptedException e) {
139      throw new InterruptedIOException("Thread aborted");
140    }
141  }
142
143  /**
144   * Getter for the number of bytes read from this stream, since creation.
145   * @return The number of bytes.
146   */
147  public long getTotalBytesRead() {
148    return bytesRead;
149  }
150
151  /**
152   * Getter for the read-rate from this stream, since creation. Calculated as
153   * bytesRead/elapsedTimeSinceStart.
154   * @return Read rate, in bytes/sec.
155   */
156  public long getBytesPerSec() {
157    long elapsed = (EnvironmentEdgeManager.currentTime() - startTime) / 1000;
158    if (elapsed == 0) {
159      return bytesRead;
160    } else {
161      return bytesRead / elapsed;
162    }
163  }
164
165  /**
166   * Getter the total time spent in sleep.
167   * @return Number of milliseconds spent in sleep.
168   */
169  public long getTotalSleepTime() {
170    return totalSleepTime;
171  }
172
173  /** {@inheritDoc} */
174  @Override
175  public String toString() {
176    return "ThrottledInputStream{" + "bytesRead=" + bytesRead + ", maxBytesPerSec=" + maxBytesPerSec
177      + ", bytesPerSec=" + getBytesPerSec() + ", totalSleepTime=" + totalSleepTime + '}';
178  }
179}