1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hadoopbackport;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23
24 import org.apache.hadoop.fs.PositionedReadable;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26
27
28
29
30
31
32
33
34
35
36
37 @InterfaceAudience.Private
38 public class ThrottledInputStream extends InputStream {
39
40 private final InputStream rawStream;
41 private final long maxBytesPerSec;
42 private final long startTime = System.currentTimeMillis();
43
44 private long bytesRead = 0;
45 private long totalSleepTime = 0;
46
47 private static final long SLEEP_DURATION_MS = 50;
48
49 public ThrottledInputStream(InputStream rawStream) {
50 this(rawStream, Long.MAX_VALUE);
51 }
52
53 public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
54 assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid";
55 this.rawStream = rawStream;
56 this.maxBytesPerSec = maxBytesPerSec;
57 }
58
59 @Override
60 public void close() throws IOException {
61 rawStream.close();
62 }
63
64
65 @Override
66 public int read() throws IOException {
67 throttle();
68 int data = rawStream.read();
69 if (data != -1) {
70 bytesRead++;
71 }
72 return data;
73 }
74
75
76 @Override
77 public int read(byte[] b) throws IOException {
78 throttle();
79 int readLen = rawStream.read(b);
80 if (readLen != -1) {
81 bytesRead += readLen;
82 }
83 return readLen;
84 }
85
86
87 @Override
88 public int read(byte[] b, int off, int len) throws IOException {
89 throttle();
90 int readLen = rawStream.read(b, off, len);
91 if (readLen != -1) {
92 bytesRead += readLen;
93 }
94 return readLen;
95 }
96
97
98
99
100
101
102
103
104
105
106 public int read(long position, byte[] buffer, int offset, int length)
107 throws IOException {
108 if (!(rawStream instanceof PositionedReadable)) {
109 throw new UnsupportedOperationException(
110 "positioned read is not supported by the internal stream");
111 }
112 throttle();
113 int readLen = ((PositionedReadable) rawStream).read(position, buffer,
114 offset, length);
115 if (readLen != -1) {
116 bytesRead += readLen;
117 }
118 return readLen;
119 }
120
121 private void throttle() throws IOException {
122 while (getBytesPerSec() > maxBytesPerSec) {
123 try {
124 Thread.sleep(SLEEP_DURATION_MS);
125 totalSleepTime += SLEEP_DURATION_MS;
126 } catch (InterruptedException e) {
127 throw new IOException("Thread aborted", e);
128 }
129 }
130 }
131
132
133
134
135
136 public long getTotalBytesRead() {
137 return bytesRead;
138 }
139
140
141
142
143
144
145 public long getBytesPerSec() {
146 long elapsed = (System.currentTimeMillis() - startTime) / 1000;
147 if (elapsed == 0) {
148 return bytesRead;
149 } else {
150 return bytesRead / elapsed;
151 }
152 }
153
154
155
156
157
158 public long getTotalSleepTime() {
159 return totalSleepTime;
160 }
161
162
163 @Override
164 public String toString() {
165 return "ThrottledInputStream{" +
166 "bytesRead=" + bytesRead +
167 ", maxBytesPerSec=" + maxBytesPerSec +
168 ", bytesPerSec=" + getBytesPerSec() +
169 ", totalSleepTime=" + totalSleepTime +
170 '}';
171 }
172 }