001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.io.hadoopbackport; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.InterruptedIOException; 024import java.util.concurrent.TimeUnit; 025 026import org.apache.hadoop.fs.PositionedReadable; 027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 028import org.apache.yetus.audience.InterfaceAudience; 029 030import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 031 032/** 033 * The ThrottleInputStream provides bandwidth throttling on a specified 034 * InputStream. It is implemented as a wrapper on top of another InputStream 035 * instance. 036 * The throttling works by examining the number of bytes read from the underlying 037 * InputStream from the beginning, and sleep()ing for a time interval if 038 * the byte-transfer is found exceed the specified tolerable maximum. 039 * (Thus, while the read-rate might exceed the maximum for a given short interval, 040 * the average tends towards the specified maximum, overall.) 041 */ 042@InterfaceAudience.Private 043public class ThrottledInputStream extends InputStream { 044 045 private final InputStream rawStream; 046 private final long maxBytesPerSec; 047 private final long startTime = System.currentTimeMillis(); 048 049 private long bytesRead = 0; 050 private long totalSleepTime = 0; 051 052 public ThrottledInputStream(InputStream rawStream) { 053 this(rawStream, Long.MAX_VALUE); 054 } 055 056 public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) { 057 assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid"; 058 this.rawStream = rawStream; 059 this.maxBytesPerSec = maxBytesPerSec; 060 } 061 062 @Override 063 public void close() throws IOException { 064 rawStream.close(); 065 } 066 067 /** {@inheritDoc} */ 068 @Override 069 public int read() throws IOException { 070 throttle(); 071 int data = rawStream.read(); 072 if (data != -1) { 073 bytesRead++; 074 } 075 return data; 076 } 077 078 /** {@inheritDoc} */ 079 @Override 080 public int read(byte[] b) throws IOException { 081 throttle(); 082 int readLen = rawStream.read(b); 083 if (readLen != -1) { 084 bytesRead += readLen; 085 } 086 return readLen; 087 } 088 089 /** {@inheritDoc} */ 090 @Override 091 public int read(byte[] b, int off, int len) throws IOException { 092 throttle(); 093 int readLen = rawStream.read(b, off, len); 094 if (readLen != -1) { 095 bytesRead += readLen; 096 } 097 return readLen; 098 } 099 100 /** 101 * Read bytes starting from the specified position. This requires rawStream is 102 * an instance of {@link PositionedReadable}. 103 * @param position 104 * @param buffer 105 * @param offset 106 * @param length 107 * @return the number of bytes read 108 */ 109 public int read(long position, byte[] buffer, int offset, int length) 110 throws IOException { 111 if (!(rawStream instanceof PositionedReadable)) { 112 throw new UnsupportedOperationException( 113 "positioned read is not supported by the internal stream"); 114 } 115 throttle(); 116 int readLen = ((PositionedReadable) rawStream).read(position, buffer, 117 offset, length); 118 if (readLen != -1) { 119 bytesRead += readLen; 120 } 121 return readLen; 122 } 123 124 private long calSleepTimeMs() { 125 return calSleepTimeMs(bytesRead, maxBytesPerSec, 126 EnvironmentEdgeManager.currentTime() - startTime); 127 } 128 129 @VisibleForTesting 130 static long calSleepTimeMs(long bytesRead, long maxBytesPerSec, long elapsed) { 131 assert elapsed > 0 : "The elapsed time should be greater than zero"; 132 if (bytesRead <= 0 || maxBytesPerSec <= 0) { 133 return 0; 134 } 135 // We use this class to load the single source file, so the bytesRead 136 // and maxBytesPerSec aren't greater than Double.MAX_VALUE. 137 // We can get the precise sleep time by using the double value. 138 long rval = (long) ((((double) bytesRead) / ((double) maxBytesPerSec)) * 1000 - elapsed); 139 if (rval <= 0) { 140 return 0; 141 } else { 142 return rval; 143 } 144 } 145 146 private void throttle() throws InterruptedIOException { 147 long sleepTime = calSleepTimeMs(); 148 totalSleepTime += sleepTime; 149 try { 150 TimeUnit.MILLISECONDS.sleep(sleepTime); 151 } catch (InterruptedException e) { 152 throw new InterruptedIOException("Thread aborted"); 153 } 154 } 155 156 /** 157 * Getter for the number of bytes read from this stream, since creation. 158 * @return The number of bytes. 159 */ 160 public long getTotalBytesRead() { 161 return bytesRead; 162 } 163 164 /** 165 * Getter for the read-rate from this stream, since creation. 166 * Calculated as bytesRead/elapsedTimeSinceStart. 167 * @return Read rate, in bytes/sec. 168 */ 169 public long getBytesPerSec() { 170 long elapsed = (System.currentTimeMillis() - startTime) / 1000; 171 if (elapsed == 0) { 172 return bytesRead; 173 } else { 174 return bytesRead / elapsed; 175 } 176 } 177 178 /** 179 * Getter the total time spent in sleep. 180 * @return Number of milliseconds spent in sleep. 181 */ 182 public long getTotalSleepTime() { 183 return totalSleepTime; 184 } 185 186 /** {@inheritDoc} */ 187 @Override 188 public String toString() { 189 return "ThrottledInputStream{" + 190 "bytesRead=" + bytesRead + 191 ", maxBytesPerSec=" + maxBytesPerSec + 192 ", bytesPerSec=" + getBytesPerSec() + 193 ", totalSleepTime=" + totalSleepTime + 194 '}'; 195 } 196}