001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.io.hadoopbackport;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InterruptedIOException;
024import java.util.concurrent.TimeUnit;
025
026import org.apache.hadoop.fs.PositionedReadable;
027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
028import org.apache.yetus.audience.InterfaceAudience;
029
030import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
031
032/**
033 * The ThrottleInputStream provides bandwidth throttling on a specified
034 * InputStream. It is implemented as a wrapper on top of another InputStream
035 * instance.
036 * The throttling works by examining the number of bytes read from the underlying
037 * InputStream from the beginning, and sleep()ing for a time interval if
038 * the byte-transfer is found exceed the specified tolerable maximum.
039 * (Thus, while the read-rate might exceed the maximum for a given short interval,
040 * the average tends towards the specified maximum, overall.)
041 */
042@InterfaceAudience.Private
043public class ThrottledInputStream extends InputStream {
044
045  private final InputStream rawStream;
046  private final long maxBytesPerSec;
047  private final long startTime = System.currentTimeMillis();
048
049  private long bytesRead = 0;
050  private long totalSleepTime = 0;
051
052  public ThrottledInputStream(InputStream rawStream) {
053    this(rawStream, Long.MAX_VALUE);
054  }
055
056  public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
057    assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid";
058    this.rawStream = rawStream;
059    this.maxBytesPerSec = maxBytesPerSec;
060  }
061
062  @Override
063  public void close() throws IOException {
064    rawStream.close();
065  }
066
067  /** {@inheritDoc} */
068  @Override
069  public int read() throws IOException {
070    throttle();
071    int data = rawStream.read();
072    if (data != -1) {
073      bytesRead++;
074    }
075    return data;
076  }
077
078  /** {@inheritDoc} */
079  @Override
080  public int read(byte[] b) throws IOException {
081    throttle();
082    int readLen = rawStream.read(b);
083    if (readLen != -1) {
084      bytesRead += readLen;
085    }
086    return readLen;
087  }
088
089  /** {@inheritDoc} */
090  @Override
091  public int read(byte[] b, int off, int len) throws IOException {
092    throttle();
093    int readLen = rawStream.read(b, off, len);
094    if (readLen != -1) {
095      bytesRead += readLen;
096    }
097    return readLen;
098  }
099
100  /**
101   * Read bytes starting from the specified position. This requires rawStream is
102   * an instance of {@link PositionedReadable}.
103   * @param position
104   * @param buffer
105   * @param offset
106   * @param length
107   * @return the number of bytes read
108   */
109  public int read(long position, byte[] buffer, int offset, int length)
110      throws IOException {
111    if (!(rawStream instanceof PositionedReadable)) {
112      throw new UnsupportedOperationException(
113        "positioned read is not supported by the internal stream");
114    }
115    throttle();
116    int readLen = ((PositionedReadable) rawStream).read(position, buffer,
117      offset, length);
118    if (readLen != -1) {
119      bytesRead += readLen;
120    }
121    return readLen;
122  }
123
124  private long calSleepTimeMs() {
125    return calSleepTimeMs(bytesRead, maxBytesPerSec,
126      EnvironmentEdgeManager.currentTime() - startTime);
127  }
128
129  @VisibleForTesting
130  static long calSleepTimeMs(long bytesRead, long maxBytesPerSec, long elapsed) {
131    assert elapsed > 0 : "The elapsed time should be greater than zero";
132    if (bytesRead <= 0 || maxBytesPerSec <= 0) {
133      return 0;
134    }
135    // We use this class to load the single source file, so the bytesRead
136    // and maxBytesPerSec aren't greater than Double.MAX_VALUE.
137    // We can get the precise sleep time by using the double value.
138    long rval = (long) ((((double) bytesRead) / ((double) maxBytesPerSec)) * 1000 - elapsed);
139    if (rval <= 0) {
140      return 0;
141    } else {
142      return rval;
143    }
144  }
145
146  private void throttle() throws InterruptedIOException {
147    long sleepTime = calSleepTimeMs();
148    totalSleepTime += sleepTime;
149    try {
150      TimeUnit.MILLISECONDS.sleep(sleepTime);
151    } catch (InterruptedException e) {
152      throw new InterruptedIOException("Thread aborted");
153    }
154  }
155
156  /**
157   * Getter for the number of bytes read from this stream, since creation.
158   * @return The number of bytes.
159   */
160  public long getTotalBytesRead() {
161    return bytesRead;
162  }
163
164  /**
165   * Getter for the read-rate from this stream, since creation.
166   * Calculated as bytesRead/elapsedTimeSinceStart.
167   * @return Read rate, in bytes/sec.
168   */
169  public long getBytesPerSec() {
170    long elapsed = (System.currentTimeMillis() - startTime) / 1000;
171    if (elapsed == 0) {
172      return bytesRead;
173    } else {
174      return bytesRead / elapsed;
175    }
176  }
177
178  /**
179   * Getter the total time spent in sleep.
180   * @return Number of milliseconds spent in sleep.
181   */
182  public long getTotalSleepTime() {
183    return totalSleepTime;
184  }
185
186  /** {@inheritDoc} */
187  @Override
188  public String toString() {
189    return "ThrottledInputStream{" +
190        "bytesRead=" + bytesRead +
191        ", maxBytesPerSec=" + maxBytesPerSec +
192        ", bytesPerSec=" + getBytesPerSec() +
193        ", totalSleepTime=" + totalSleepTime +
194        '}';
195  }
196}