001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import java.util.ArrayList; 023import java.util.Iterator; 024import java.util.List; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.Executors; 027import java.util.concurrent.Future; 028import java.util.function.BiConsumer; 029import org.apache.hadoop.hbase.nio.ByteBuff; 030import org.apache.hadoop.util.StringUtils; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * This class manages an array of ByteBuffers with a default size 4MB. These buffers are sequential 037 * and could be considered as a large buffer.It supports reading/writing data from this large buffer 038 * with a position and offset 039 */ 040@InterfaceAudience.Private 041public class ByteBufferArray { 042 private static final Logger LOG = LoggerFactory.getLogger(ByteBufferArray.class); 043 044 public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; 045 private final int bufferSize; 046 private final int bufferCount; 047 final ByteBuffer[] buffers; 048 049 /** 050 * We allocate a number of byte buffers as the capacity. 051 * @param capacity total size of the byte buffer array 052 * @param allocator the ByteBufferAllocator that will create the buffers 053 * @throws IOException throws IOException if there is an exception thrown by the allocator 054 */ 055 public ByteBufferArray(long capacity, ByteBufferAllocator allocator) throws IOException { 056 this(getBufferSize(capacity), getBufferCount(capacity), 057 Runtime.getRuntime().availableProcessors(), capacity, allocator); 058 } 059 060 ByteBufferArray(int bufferSize, int bufferCount, int threadCount, long capacity, 061 ByteBufferAllocator alloc) throws IOException { 062 this.bufferSize = bufferSize; 063 this.bufferCount = bufferCount; 064 LOG.info("Allocating buffers total={}, sizePerBuffer={}, count={}", 065 StringUtils.byteDesc(capacity), StringUtils.byteDesc(bufferSize), bufferCount); 066 this.buffers = new ByteBuffer[bufferCount]; 067 createBuffers(threadCount, alloc); 068 } 069 070 private void createBuffers(int threadCount, ByteBufferAllocator alloc) throws IOException { 071 ExecutorService pool = Executors.newFixedThreadPool(threadCount); 072 int perThreadCount = bufferCount / threadCount; 073 int reminder = bufferCount % threadCount; 074 try { 075 List<Future<ByteBuffer[]>> futures = new ArrayList<>(threadCount); 076 // Dispatch the creation task to each thread. 077 for (int i = 0; i < threadCount; i++) { 078 final int chunkSize = perThreadCount + ((i == threadCount - 1) ? reminder : 0); 079 futures.add(pool.submit(() -> { 080 ByteBuffer[] chunk = new ByteBuffer[chunkSize]; 081 for (int k = 0; k < chunkSize; k++) { 082 chunk[k] = alloc.allocate(bufferSize); 083 } 084 return chunk; 085 })); 086 } 087 // Append the buffers created by each thread. 088 int bufferIndex = 0; 089 try { 090 for (Future<ByteBuffer[]> f : futures) { 091 for (ByteBuffer b : f.get()) { 092 this.buffers[bufferIndex++] = b; 093 } 094 } 095 assert bufferIndex == bufferCount; 096 } catch (Exception e) { 097 LOG.error("Buffer creation interrupted", e); 098 throw new IOException(e); 099 } 100 } finally { 101 pool.shutdownNow(); 102 } 103 } 104 105 static int getBufferSize(long capacity) { 106 int bufferSize = DEFAULT_BUFFER_SIZE; 107 if (bufferSize > (capacity / 16)) { 108 bufferSize = (int) roundUp(capacity / 16, 32768); 109 } 110 return bufferSize; 111 } 112 113 private static int getBufferCount(long capacity) { 114 int bufferSize = getBufferSize(capacity); 115 return (int) (roundUp(capacity, bufferSize) / bufferSize); 116 } 117 118 private static long roundUp(long n, long to) { 119 return ((n + to - 1) / to) * to; 120 } 121 122 /** 123 * Transfers bytes from this buffers array into the given destination {@link ByteBuff} 124 * @param offset start position in this big logical array. 125 * @param dst the destination ByteBuff. Notice that its position will be advanced. 126 * @return number of bytes read 127 */ 128 public int read(long offset, ByteBuff dst) { 129 return internalTransfer(offset, dst, READER); 130 } 131 132 /** 133 * Transfers bytes from the given source {@link ByteBuff} into this buffer array 134 * @param offset start offset of this big logical array. 135 * @param src the source ByteBuff. Notice that its position will be advanced. 136 * @return number of bytes write 137 */ 138 public int write(long offset, ByteBuff src) { 139 return internalTransfer(offset, src, WRITER); 140 } 141 142 /** 143 * Transfer bytes from source {@link ByteBuff} to destination {@link ByteBuffer}. Position of both 144 * source and destination will be advanced. 145 */ 146 @SuppressWarnings("UnnecessaryLambda") 147 private static final BiConsumer<ByteBuffer, ByteBuff> WRITER = (dst, src) -> { 148 int off = src.position(), len = dst.remaining(); 149 src.get(dst, off, len); 150 src.position(off + len); 151 }; 152 153 /** 154 * Transfer bytes from source {@link ByteBuffer} to destination {@link ByteBuff}, Position of both 155 * source and destination will be advanced. 156 */ 157 @SuppressWarnings("UnnecessaryLambda") 158 private static final BiConsumer<ByteBuffer, ByteBuff> READER = (src, dst) -> { 159 int off = dst.position(), len = src.remaining(), srcOff = src.position(); 160 dst.put(off, ByteBuff.wrap(src), srcOff, len); 161 src.position(srcOff + len); 162 dst.position(off + len); 163 }; 164 165 /** 166 * Transferring all remaining bytes from b to the buffers array starting at offset, or 167 * transferring bytes from the buffers array at offset to b until b is filled. Notice that 168 * position of ByteBuff b will be advanced. 169 * @param offset where we start in the big logical array. 170 * @param b the ByteBuff to transfer from or to 171 * @param transfer the transfer interface. 172 * @return the length of bytes we transferred. 173 */ 174 private int internalTransfer(long offset, ByteBuff b, BiConsumer<ByteBuffer, ByteBuff> transfer) { 175 int expectedTransferLen = b.remaining(); 176 if (expectedTransferLen == 0) { 177 return 0; 178 } 179 BufferIterator it = new BufferIterator(offset, expectedTransferLen); 180 while (it.hasNext()) { 181 ByteBuffer a = it.next(); 182 transfer.accept(a, b); 183 assert !a.hasRemaining(); 184 } 185 assert expectedTransferLen == it.getSum() : "Expected transfer length (=" + expectedTransferLen 186 + ") don't match the actual transfer length(=" + it.getSum() + ")"; 187 return expectedTransferLen; 188 } 189 190 /** 191 * Creates a sub-array from a given array of ByteBuffers from the given offset to the length 192 * specified. For eg, if there are 4 buffers forming an array each with length 10 and if we call 193 * asSubByteBuffers(5, 10) then we will create an sub-array consisting of two BBs and the first 194 * one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from 'position' 0 to 195 * 'length' 5. 196 * @param offset the position in the whole array which is composited by multiple byte buffers. 197 * @param len the length of bytes 198 * @return the underlying ByteBuffers, each ByteBuffer is a slice from the backend and will have a 199 * zero position. 200 */ 201 public ByteBuffer[] asSubByteBuffers(long offset, final int len) { 202 BufferIterator it = new BufferIterator(offset, len); 203 ByteBuffer[] mbb = new ByteBuffer[it.getBufferCount()]; 204 for (int i = 0; i < mbb.length; i++) { 205 assert it.hasNext(); 206 mbb[i] = it.next(); 207 } 208 assert it.getSum() == len; 209 return mbb; 210 } 211 212 /** 213 * Iterator to fetch ByteBuffers from offset with given length in this big logical array. 214 */ 215 private class BufferIterator implements Iterator<ByteBuffer> { 216 private final int len; 217 private int startBuffer, startOffset, endBuffer, endOffset; 218 private int curIndex, sum = 0; 219 220 private int index(long pos) { 221 return (int) (pos / bufferSize); 222 } 223 224 private int offset(long pos) { 225 return (int) (pos % bufferSize); 226 } 227 228 public BufferIterator(long offset, int len) { 229 assert len >= 0 && offset >= 0; 230 this.len = len; 231 232 this.startBuffer = index(offset); 233 this.startOffset = offset(offset); 234 235 this.endBuffer = index(offset + len); 236 this.endOffset = offset(offset + len); 237 if (startBuffer < endBuffer && endOffset == 0) { 238 endBuffer--; 239 endOffset = bufferSize; 240 } 241 assert startBuffer >= 0 && startBuffer < bufferCount; 242 assert endBuffer >= 0 && endBuffer < bufferCount; 243 244 // initialize the index to the first buffer index. 245 this.curIndex = startBuffer; 246 } 247 248 @Override 249 public boolean hasNext() { 250 return this.curIndex <= endBuffer; 251 } 252 253 /** 254 * The returned ByteBuffer is an sliced one, it won't affect the position or limit of the 255 * original one. 256 */ 257 @Override 258 public ByteBuffer next() { 259 ByteBuffer bb = buffers[curIndex].duplicate(); 260 if (curIndex == startBuffer) { 261 bb.position(startOffset).limit(Math.min(bufferSize, startOffset + len)); 262 } else if (curIndex == endBuffer) { 263 bb.position(0).limit(endOffset); 264 } else { 265 bb.position(0).limit(bufferSize); 266 } 267 curIndex++; 268 sum += bb.remaining(); 269 // Make sure that its pos is zero, it's important because MBB will count from zero for all nio 270 // ByteBuffers. 271 return bb.slice(); 272 } 273 274 int getSum() { 275 return sum; 276 } 277 278 int getBufferCount() { 279 return this.endBuffer - this.startBuffer + 1; 280 } 281 } 282}