View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.io.hadoopbackport;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  
24  import org.apache.hadoop.fs.PositionedReadable;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  
27  /**
28   * The ThrottleInputStream provides bandwidth throttling on a specified
29   * InputStream. It is implemented as a wrapper on top of another InputStream
30   * instance.
31   * The throttling works by examining the number of bytes read from the underlying
32   * InputStream from the beginning, and sleep()ing for a time interval if
33   * the byte-transfer is found exceed the specified tolerable maximum.
34   * (Thus, while the read-rate might exceed the maximum for a given short interval,
35   * the average tends towards the specified maximum, overall.)
36   */
37  @InterfaceAudience.Private
38  public class ThrottledInputStream extends InputStream {
39  
40    private final InputStream rawStream;
41    private final long maxBytesPerSec;
42    private final long startTime = System.currentTimeMillis();
43  
44    private long bytesRead = 0;
45    private long totalSleepTime = 0;
46  
47    private static final long SLEEP_DURATION_MS = 50;
48  
49    public ThrottledInputStream(InputStream rawStream) {
50      this(rawStream, Long.MAX_VALUE);
51    }
52  
53    public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
54      assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid";
55      this.rawStream = rawStream;
56      this.maxBytesPerSec = maxBytesPerSec;
57    }
58  
59    @Override
60    public void close() throws IOException {
61      rawStream.close();
62    }
63  
64    /** @inheritDoc */
65    @Override
66    public int read() throws IOException {
67      throttle();
68      int data = rawStream.read();
69      if (data != -1) {
70        bytesRead++;
71      }
72      return data;
73    }
74  
75    /** @inheritDoc */
76    @Override
77    public int read(byte[] b) throws IOException {
78      throttle();
79      int readLen = rawStream.read(b);
80      if (readLen != -1) {
81        bytesRead += readLen;
82      }
83      return readLen;
84    }
85  
86    /** @inheritDoc */
87    @Override
88    public int read(byte[] b, int off, int len) throws IOException {
89      throttle();
90      int readLen = rawStream.read(b, off, len);
91      if (readLen != -1) {
92        bytesRead += readLen;
93      }
94      return readLen;
95    }
96  
97    /**
98     * Read bytes starting from the specified position. This requires rawStream is
99     * an instance of {@link PositionedReadable}.
100    * @param position
101    * @param buffer
102    * @param offset
103    * @param length
104    * @return the number of bytes read
105    */
106   public int read(long position, byte[] buffer, int offset, int length)
107       throws IOException {
108     if (!(rawStream instanceof PositionedReadable)) {
109       throw new UnsupportedOperationException(
110         "positioned read is not supported by the internal stream");
111     }
112     throttle();
113     int readLen = ((PositionedReadable) rawStream).read(position, buffer,
114       offset, length);
115     if (readLen != -1) {
116       bytesRead += readLen;
117     }
118     return readLen;
119   }
120 
121   private void throttle() throws IOException {
122     while (getBytesPerSec() > maxBytesPerSec) {
123       try {
124         Thread.sleep(SLEEP_DURATION_MS);
125         totalSleepTime += SLEEP_DURATION_MS;
126       } catch (InterruptedException e) {
127         throw new IOException("Thread aborted", e);
128       }
129     }
130   }
131 
132   /**
133    * Getter for the number of bytes read from this stream, since creation.
134    * @return The number of bytes.
135    */
136   public long getTotalBytesRead() {
137     return bytesRead;
138   }
139 
140   /**
141    * Getter for the read-rate from this stream, since creation.
142    * Calculated as bytesRead/elapsedTimeSinceStart.
143    * @return Read rate, in bytes/sec.
144    */
145   public long getBytesPerSec() {
146     long elapsed = (System.currentTimeMillis() - startTime) / 1000;
147     if (elapsed == 0) {
148       return bytesRead;
149     } else {
150       return bytesRead / elapsed;
151     }
152   }
153 
154   /**
155    * Getter the total time spent in sleep.
156    * @return Number of milliseconds spent in sleep.
157    */
158   public long getTotalSleepTime() {
159     return totalSleepTime;
160   }
161 
162   /** @inheritDoc */
163   @Override
164   public String toString() {
165     return "ThrottledInputStream{" +
166         "bytesRead=" + bytesRead +
167         ", maxBytesPerSec=" + maxBytesPerSec +
168         ", bytesPerSec=" + getBytesPerSec() +
169         ", totalSleepTime=" + totalSleepTime +
170         '}';
171   }
172 }