001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hadoopbackport; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.InterruptedIOException; 023import java.util.concurrent.TimeUnit; 024import org.apache.hadoop.fs.PositionedReadable; 025import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 026import org.apache.yetus.audience.InterfaceAudience; 027 028/** 029 * The ThrottleInputStream provides bandwidth throttling on a specified InputStream. It is 030 * implemented as a wrapper on top of another InputStream instance. The throttling works by 031 * examining the number of bytes read from the underlying InputStream from the beginning, and 032 * sleep()ing for a time interval if the byte-transfer is found exceed the specified tolerable 033 * maximum. (Thus, while the read-rate might exceed the maximum for a given short interval, the 034 * average tends towards the specified maximum, overall.) 035 */ 036@InterfaceAudience.Private 037public class ThrottledInputStream extends InputStream { 038 039 private final InputStream rawStream; 040 private final long maxBytesPerSec; 041 private final long startTime = EnvironmentEdgeManager.currentTime(); 042 043 private long bytesRead = 0; 044 private long totalSleepTime = 0; 045 046 public ThrottledInputStream(InputStream rawStream) { 047 this(rawStream, Long.MAX_VALUE); 048 } 049 050 public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) { 051 assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid"; 052 this.rawStream = rawStream; 053 this.maxBytesPerSec = maxBytesPerSec; 054 } 055 056 @Override 057 public void close() throws IOException { 058 rawStream.close(); 059 } 060 061 /** {@inheritDoc} */ 062 @Override 063 public int read() throws IOException { 064 throttle(); 065 int data = rawStream.read(); 066 if (data != -1) { 067 bytesRead++; 068 } 069 return data; 070 } 071 072 /** {@inheritDoc} */ 073 @Override 074 public int read(byte[] b) throws IOException { 075 throttle(); 076 int readLen = rawStream.read(b); 077 if (readLen != -1) { 078 bytesRead += readLen; 079 } 080 return readLen; 081 } 082 083 /** {@inheritDoc} */ 084 @Override 085 public int read(byte[] b, int off, int len) throws IOException { 086 throttle(); 087 int readLen = rawStream.read(b, off, len); 088 if (readLen != -1) { 089 bytesRead += readLen; 090 } 091 return readLen; 092 } 093 094 /** 095 * Read bytes starting from the specified position. This requires rawStream is an instance of 096 * {@link PositionedReadable}. 097 * @return the number of bytes read 098 */ 099 public int read(long position, byte[] buffer, int offset, int length) throws IOException { 100 if (!(rawStream instanceof PositionedReadable)) { 101 throw new UnsupportedOperationException( 102 "positioned read is not supported by the internal stream"); 103 } 104 throttle(); 105 int readLen = ((PositionedReadable) rawStream).read(position, buffer, offset, length); 106 if (readLen != -1) { 107 bytesRead += readLen; 108 } 109 return readLen; 110 } 111 112 private long calSleepTimeMs() { 113 return calSleepTimeMs(bytesRead, maxBytesPerSec, 114 EnvironmentEdgeManager.currentTime() - startTime); 115 } 116 117 static long calSleepTimeMs(long bytesRead, long maxBytesPerSec, long elapsed) { 118 assert elapsed > 0 : "The elapsed time should be greater than zero"; 119 if (bytesRead <= 0 || maxBytesPerSec <= 0) { 120 return 0; 121 } 122 // We use this class to load the single source file, so the bytesRead 123 // and maxBytesPerSec aren't greater than Double.MAX_VALUE. 124 // We can get the precise sleep time by using the double value. 125 long rval = (long) ((((double) bytesRead) / ((double) maxBytesPerSec)) * 1000 - elapsed); 126 if (rval <= 0) { 127 return 0; 128 } else { 129 return rval; 130 } 131 } 132 133 private void throttle() throws InterruptedIOException { 134 long sleepTime = calSleepTimeMs(); 135 totalSleepTime += sleepTime; 136 try { 137 TimeUnit.MILLISECONDS.sleep(sleepTime); 138 } catch (InterruptedException e) { 139 throw new InterruptedIOException("Thread aborted"); 140 } 141 } 142 143 /** 144 * Getter for the number of bytes read from this stream, since creation. 145 * @return The number of bytes. 146 */ 147 public long getTotalBytesRead() { 148 return bytesRead; 149 } 150 151 /** 152 * Getter for the read-rate from this stream, since creation. Calculated as 153 * bytesRead/elapsedTimeSinceStart. 154 * @return Read rate, in bytes/sec. 155 */ 156 public long getBytesPerSec() { 157 long elapsed = (EnvironmentEdgeManager.currentTime() - startTime) / 1000; 158 if (elapsed == 0) { 159 return bytesRead; 160 } else { 161 return bytesRead / elapsed; 162 } 163 } 164 165 /** 166 * Getter the total time spent in sleep. 167 * @return Number of milliseconds spent in sleep. 168 */ 169 public long getTotalSleepTime() { 170 return totalSleepTime; 171 } 172 173 /** {@inheritDoc} */ 174 @Override 175 public String toString() { 176 return "ThrottledInputStream{" + "bytesRead=" + bytesRead + ", maxBytesPerSec=" + maxBytesPerSec 177 + ", bytesPerSec=" + getBytesPerSec() + ", totalSleepTime=" + totalSleepTime + '}'; 178 } 179}