001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hadoopbackport;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.InterruptedIOException;
023import java.util.concurrent.TimeUnit;
024import org.apache.hadoop.fs.PositionedReadable;
025import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
026import org.apache.yetus.audience.InterfaceAudience;
027
028/**
029 * The ThrottleInputStream provides bandwidth throttling on a specified InputStream. It is
030 * implemented as a wrapper on top of another InputStream instance. The throttling works by
031 * examining the number of bytes read from the underlying InputStream from the beginning, and
032 * sleep()ing for a time interval if the byte-transfer is found exceed the specified tolerable
033 * maximum. (Thus, while the read-rate might exceed the maximum for a given short interval, the
034 * average tends towards the specified maximum, overall.)
035 */
036@InterfaceAudience.Private
037public class ThrottledInputStream extends InputStream {
038
039  private final InputStream rawStream;
040  private final long maxBytesPerSec;
041  private final long startTime = EnvironmentEdgeManager.currentTime();
042
043  private long bytesRead = 0;
044  private long totalSleepTime = 0;
045
046  public ThrottledInputStream(InputStream rawStream) {
047    this(rawStream, Long.MAX_VALUE);
048  }
049
050  public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
051    assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid";
052    this.rawStream = rawStream;
053    this.maxBytesPerSec = maxBytesPerSec;
054  }
055
056  @Override
057  public void close() throws IOException {
058    rawStream.close();
059  }
060
061  /** {@inheritDoc} */
062  @Override
063  public int read() throws IOException {
064    throttle();
065    int data = rawStream.read();
066    if (data != -1) {
067      bytesRead++;
068    }
069    return data;
070  }
071
072  /** {@inheritDoc} */
073  @Override
074  public int read(byte[] b) throws IOException {
075    throttle();
076    int readLen = rawStream.read(b);
077    if (readLen != -1) {
078      bytesRead += readLen;
079    }
080    return readLen;
081  }
082
083  /** {@inheritDoc} */
084  @Override
085  public int read(byte[] b, int off, int len) throws IOException {
086    throttle();
087    int readLen = rawStream.read(b, off, len);
088    if (readLen != -1) {
089      bytesRead += readLen;
090    }
091    return readLen;
092  }
093
094  /**
095   * Read bytes starting from the specified position. This requires rawStream is an instance of
096   * {@link PositionedReadable}. nnnn * @return the number of bytes read
097   */
098  public int read(long position, byte[] buffer, int offset, int length) throws IOException {
099    if (!(rawStream instanceof PositionedReadable)) {
100      throw new UnsupportedOperationException(
101        "positioned read is not supported by the internal stream");
102    }
103    throttle();
104    int readLen = ((PositionedReadable) rawStream).read(position, buffer, offset, length);
105    if (readLen != -1) {
106      bytesRead += readLen;
107    }
108    return readLen;
109  }
110
111  private long calSleepTimeMs() {
112    return calSleepTimeMs(bytesRead, maxBytesPerSec,
113      EnvironmentEdgeManager.currentTime() - startTime);
114  }
115
116  static long calSleepTimeMs(long bytesRead, long maxBytesPerSec, long elapsed) {
117    assert elapsed > 0 : "The elapsed time should be greater than zero";
118    if (bytesRead <= 0 || maxBytesPerSec <= 0) {
119      return 0;
120    }
121    // We use this class to load the single source file, so the bytesRead
122    // and maxBytesPerSec aren't greater than Double.MAX_VALUE.
123    // We can get the precise sleep time by using the double value.
124    long rval = (long) ((((double) bytesRead) / ((double) maxBytesPerSec)) * 1000 - elapsed);
125    if (rval <= 0) {
126      return 0;
127    } else {
128      return rval;
129    }
130  }
131
132  private void throttle() throws InterruptedIOException {
133    long sleepTime = calSleepTimeMs();
134    totalSleepTime += sleepTime;
135    try {
136      TimeUnit.MILLISECONDS.sleep(sleepTime);
137    } catch (InterruptedException e) {
138      throw new InterruptedIOException("Thread aborted");
139    }
140  }
141
142  /**
143   * Getter for the number of bytes read from this stream, since creation.
144   * @return The number of bytes.
145   */
146  public long getTotalBytesRead() {
147    return bytesRead;
148  }
149
150  /**
151   * Getter for the read-rate from this stream, since creation. Calculated as
152   * bytesRead/elapsedTimeSinceStart.
153   * @return Read rate, in bytes/sec.
154   */
155  public long getBytesPerSec() {
156    long elapsed = (EnvironmentEdgeManager.currentTime() - startTime) / 1000;
157    if (elapsed == 0) {
158      return bytesRead;
159    } else {
160      return bytesRead / elapsed;
161    }
162  }
163
164  /**
165   * Getter the total time spent in sleep.
166   * @return Number of milliseconds spent in sleep.
167   */
168  public long getTotalSleepTime() {
169    return totalSleepTime;
170  }
171
172  /** {@inheritDoc} */
173  @Override
174  public String toString() {
175    return "ThrottledInputStream{" + "bytesRead=" + bytesRead + ", maxBytesPerSec=" + maxBytesPerSec
176      + ", bytesPerSec=" + getBytesPerSec() + ", totalSleepTime=" + totalSleepTime + '}';
177  }
178}