001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.io.hadoopbackport; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.InterruptedIOException; 024import java.util.concurrent.TimeUnit; 025 026import org.apache.hadoop.fs.PositionedReadable; 027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 028import org.apache.yetus.audience.InterfaceAudience; 029 030/** 031 * The ThrottleInputStream provides bandwidth throttling on a specified 032 * InputStream. It is implemented as a wrapper on top of another InputStream 033 * instance. 034 * The throttling works by examining the number of bytes read from the underlying 035 * InputStream from the beginning, and sleep()ing for a time interval if 036 * the byte-transfer is found exceed the specified tolerable maximum. 037 * (Thus, while the read-rate might exceed the maximum for a given short interval, 038 * the average tends towards the specified maximum, overall.) 039 */ 040@InterfaceAudience.Private 041public class ThrottledInputStream extends InputStream { 042 043 private final InputStream rawStream; 044 private final long maxBytesPerSec; 045 private final long startTime = System.currentTimeMillis(); 046 047 private long bytesRead = 0; 048 private long totalSleepTime = 0; 049 050 public ThrottledInputStream(InputStream rawStream) { 051 this(rawStream, Long.MAX_VALUE); 052 } 053 054 public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) { 055 assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid"; 056 this.rawStream = rawStream; 057 this.maxBytesPerSec = maxBytesPerSec; 058 } 059 060 @Override 061 public void close() throws IOException { 062 rawStream.close(); 063 } 064 065 /** {@inheritDoc} */ 066 @Override 067 public int read() throws IOException { 068 throttle(); 069 int data = rawStream.read(); 070 if (data != -1) { 071 bytesRead++; 072 } 073 return data; 074 } 075 076 /** {@inheritDoc} */ 077 @Override 078 public int read(byte[] b) throws IOException { 079 throttle(); 080 int readLen = rawStream.read(b); 081 if (readLen != -1) { 082 bytesRead += readLen; 083 } 084 return readLen; 085 } 086 087 /** {@inheritDoc} */ 088 @Override 089 public int read(byte[] b, int off, int len) throws IOException { 090 throttle(); 091 int readLen = rawStream.read(b, off, len); 092 if (readLen != -1) { 093 bytesRead += readLen; 094 } 095 return readLen; 096 } 097 098 /** 099 * Read bytes starting from the specified position. This requires rawStream is 100 * an instance of {@link PositionedReadable}. 101 * @param position 102 * @param buffer 103 * @param offset 104 * @param length 105 * @return the number of bytes read 106 */ 107 public int read(long position, byte[] buffer, int offset, int length) 108 throws IOException { 109 if (!(rawStream instanceof PositionedReadable)) { 110 throw new UnsupportedOperationException( 111 "positioned read is not supported by the internal stream"); 112 } 113 throttle(); 114 int readLen = ((PositionedReadable) rawStream).read(position, buffer, 115 offset, length); 116 if (readLen != -1) { 117 bytesRead += readLen; 118 } 119 return readLen; 120 } 121 122 private long calSleepTimeMs() { 123 return calSleepTimeMs(bytesRead, maxBytesPerSec, 124 EnvironmentEdgeManager.currentTime() - startTime); 125 } 126 127 static long calSleepTimeMs(long bytesRead, long maxBytesPerSec, long elapsed) { 128 assert elapsed > 0 : "The elapsed time should be greater than zero"; 129 if (bytesRead <= 0 || maxBytesPerSec <= 0) { 130 return 0; 131 } 132 // We use this class to load the single source file, so the bytesRead 133 // and maxBytesPerSec aren't greater than Double.MAX_VALUE. 134 // We can get the precise sleep time by using the double value. 135 long rval = (long) ((((double) bytesRead) / ((double) maxBytesPerSec)) * 1000 - elapsed); 136 if (rval <= 0) { 137 return 0; 138 } else { 139 return rval; 140 } 141 } 142 143 private void throttle() throws InterruptedIOException { 144 long sleepTime = calSleepTimeMs(); 145 totalSleepTime += sleepTime; 146 try { 147 TimeUnit.MILLISECONDS.sleep(sleepTime); 148 } catch (InterruptedException e) { 149 throw new InterruptedIOException("Thread aborted"); 150 } 151 } 152 153 /** 154 * Getter for the number of bytes read from this stream, since creation. 155 * @return The number of bytes. 156 */ 157 public long getTotalBytesRead() { 158 return bytesRead; 159 } 160 161 /** 162 * Getter for the read-rate from this stream, since creation. 163 * Calculated as bytesRead/elapsedTimeSinceStart. 164 * @return Read rate, in bytes/sec. 165 */ 166 public long getBytesPerSec() { 167 long elapsed = (System.currentTimeMillis() - startTime) / 1000; 168 if (elapsed == 0) { 169 return bytesRead; 170 } else { 171 return bytesRead / elapsed; 172 } 173 } 174 175 /** 176 * Getter the total time spent in sleep. 177 * @return Number of milliseconds spent in sleep. 178 */ 179 public long getTotalSleepTime() { 180 return totalSleepTime; 181 } 182 183 /** {@inheritDoc} */ 184 @Override 185 public String toString() { 186 return "ThrottledInputStream{" + 187 "bytesRead=" + bytesRead + 188 ", maxBytesPerSec=" + maxBytesPerSec + 189 ", bytesPerSec=" + getBytesPerSec() + 190 ", totalSleepTime=" + totalSleepTime + 191 '}'; 192 } 193}