001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hadoopbackport; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.InterruptedIOException; 023import java.util.concurrent.TimeUnit; 024import org.apache.hadoop.fs.PositionedReadable; 025import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 026import org.apache.yetus.audience.InterfaceAudience; 027 028/** 029 * The ThrottleInputStream provides bandwidth throttling on a specified InputStream. It is 030 * implemented as a wrapper on top of another InputStream instance. The throttling works by 031 * examining the number of bytes read from the underlying InputStream from the beginning, and 032 * sleep()ing for a time interval if the byte-transfer is found exceed the specified tolerable 033 * maximum. (Thus, while the read-rate might exceed the maximum for a given short interval, the 034 * average tends towards the specified maximum, overall.) 035 */ 036@InterfaceAudience.Private 037public class ThrottledInputStream extends InputStream { 038 039 private final InputStream rawStream; 040 private final long maxBytesPerSec; 041 private final long startTime = EnvironmentEdgeManager.currentTime(); 042 043 private long bytesRead = 0; 044 private long totalSleepTime = 0; 045 046 public ThrottledInputStream(InputStream rawStream) { 047 this(rawStream, Long.MAX_VALUE); 048 } 049 050 public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) { 051 assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid"; 052 this.rawStream = rawStream; 053 this.maxBytesPerSec = maxBytesPerSec; 054 } 055 056 @Override 057 public void close() throws IOException { 058 rawStream.close(); 059 } 060 061 /** {@inheritDoc} */ 062 @Override 063 public int read() throws IOException { 064 throttle(); 065 int data = rawStream.read(); 066 if (data != -1) { 067 bytesRead++; 068 } 069 return data; 070 } 071 072 /** {@inheritDoc} */ 073 @Override 074 public int read(byte[] b) throws IOException { 075 throttle(); 076 int readLen = rawStream.read(b); 077 if (readLen != -1) { 078 bytesRead += readLen; 079 } 080 return readLen; 081 } 082 083 /** {@inheritDoc} */ 084 @Override 085 public int read(byte[] b, int off, int len) throws IOException { 086 throttle(); 087 int readLen = rawStream.read(b, off, len); 088 if (readLen != -1) { 089 bytesRead += readLen; 090 } 091 return readLen; 092 } 093 094 /** 095 * Read bytes starting from the specified position. This requires rawStream is an instance of 096 * {@link PositionedReadable}. nnnn * @return the number of bytes read 097 */ 098 public int read(long position, byte[] buffer, int offset, int length) throws IOException { 099 if (!(rawStream instanceof PositionedReadable)) { 100 throw new UnsupportedOperationException( 101 "positioned read is not supported by the internal stream"); 102 } 103 throttle(); 104 int readLen = ((PositionedReadable) rawStream).read(position, buffer, offset, length); 105 if (readLen != -1) { 106 bytesRead += readLen; 107 } 108 return readLen; 109 } 110 111 private long calSleepTimeMs() { 112 return calSleepTimeMs(bytesRead, maxBytesPerSec, 113 EnvironmentEdgeManager.currentTime() - startTime); 114 } 115 116 static long calSleepTimeMs(long bytesRead, long maxBytesPerSec, long elapsed) { 117 assert elapsed > 0 : "The elapsed time should be greater than zero"; 118 if (bytesRead <= 0 || maxBytesPerSec <= 0) { 119 return 0; 120 } 121 // We use this class to load the single source file, so the bytesRead 122 // and maxBytesPerSec aren't greater than Double.MAX_VALUE. 123 // We can get the precise sleep time by using the double value. 124 long rval = (long) ((((double) bytesRead) / ((double) maxBytesPerSec)) * 1000 - elapsed); 125 if (rval <= 0) { 126 return 0; 127 } else { 128 return rval; 129 } 130 } 131 132 private void throttle() throws InterruptedIOException { 133 long sleepTime = calSleepTimeMs(); 134 totalSleepTime += sleepTime; 135 try { 136 TimeUnit.MILLISECONDS.sleep(sleepTime); 137 } catch (InterruptedException e) { 138 throw new InterruptedIOException("Thread aborted"); 139 } 140 } 141 142 /** 143 * Getter for the number of bytes read from this stream, since creation. 144 * @return The number of bytes. 145 */ 146 public long getTotalBytesRead() { 147 return bytesRead; 148 } 149 150 /** 151 * Getter for the read-rate from this stream, since creation. Calculated as 152 * bytesRead/elapsedTimeSinceStart. 153 * @return Read rate, in bytes/sec. 154 */ 155 public long getBytesPerSec() { 156 long elapsed = (EnvironmentEdgeManager.currentTime() - startTime) / 1000; 157 if (elapsed == 0) { 158 return bytesRead; 159 } else { 160 return bytesRead / elapsed; 161 } 162 } 163 164 /** 165 * Getter the total time spent in sleep. 166 * @return Number of milliseconds spent in sleep. 167 */ 168 public long getTotalSleepTime() { 169 return totalSleepTime; 170 } 171 172 /** {@inheritDoc} */ 173 @Override 174 public String toString() { 175 return "ThrottledInputStream{" + "bytesRead=" + bytesRead + ", maxBytesPerSec=" + maxBytesPerSec 176 + ", bytesPerSec=" + getBytesPerSec() + ", totalSleepTime=" + totalSleepTime + '}'; 177 } 178}