001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one or more 005 * contributor license agreements. See the NOTICE file distributed with this 006 * work for additional information regarding copyright ownership. The ASF 007 * licenses this file to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 016 * License for the specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import java.io.IOException; 022import java.nio.ByteBuffer; 023import java.util.ArrayList; 024import java.util.Iterator; 025import java.util.List; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.Future; 029import java.util.function.BiConsumer; 030 031import org.apache.hadoop.hbase.nio.ByteBuff; 032import org.apache.hadoop.util.StringUtils; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 038 039/** 040 * This class manages an array of ByteBuffers with a default size 4MB. These buffers are sequential 041 * and could be considered as a large buffer.It supports reading/writing data from this large buffer 042 * with a position and offset 043 */ 044@InterfaceAudience.Private 045public class ByteBufferArray { 046 private static final Logger LOG = LoggerFactory.getLogger(ByteBufferArray.class); 047 048 public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; 049 private final int bufferSize; 050 private final int bufferCount; 051 final ByteBuffer[] buffers; 052 053 /** 054 * We allocate a number of byte buffers as the capacity. 055 * @param capacity total size of the byte buffer array 056 * @param allocator the ByteBufferAllocator that will create the buffers 057 * @throws IOException throws IOException if there is an exception thrown by the allocator 058 */ 059 public ByteBufferArray(long capacity, ByteBufferAllocator allocator) throws IOException { 060 this(getBufferSize(capacity), getBufferCount(capacity), 061 Runtime.getRuntime().availableProcessors(), capacity, allocator); 062 } 063 064 @VisibleForTesting 065 ByteBufferArray(int bufferSize, int bufferCount, int threadCount, long capacity, 066 ByteBufferAllocator alloc) throws IOException { 067 this.bufferSize = bufferSize; 068 this.bufferCount = bufferCount; 069 LOG.info("Allocating buffers total={}, sizePerBuffer={}, count={}", 070 StringUtils.byteDesc(capacity), StringUtils.byteDesc(bufferSize), bufferCount); 071 this.buffers = new ByteBuffer[bufferCount]; 072 createBuffers(threadCount, alloc); 073 } 074 075 private void createBuffers(int threadCount, ByteBufferAllocator alloc) throws IOException { 076 ExecutorService pool = Executors.newFixedThreadPool(threadCount); 077 int perThreadCount = bufferCount / threadCount; 078 int reminder = bufferCount % threadCount; 079 try { 080 List<Future<ByteBuffer[]>> futures = new ArrayList<>(threadCount); 081 // Dispatch the creation task to each thread. 082 for (int i = 0; i < threadCount; i++) { 083 final int chunkSize = perThreadCount + ((i == threadCount - 1) ? reminder : 0); 084 futures.add(pool.submit(() -> { 085 ByteBuffer[] chunk = new ByteBuffer[chunkSize]; 086 for (int k = 0; k < chunkSize; k++) { 087 chunk[k] = alloc.allocate(bufferSize); 088 } 089 return chunk; 090 })); 091 } 092 // Append the buffers created by each thread. 093 int bufferIndex = 0; 094 try { 095 for (Future<ByteBuffer[]> f : futures) { 096 for (ByteBuffer b : f.get()) { 097 this.buffers[bufferIndex++] = b; 098 } 099 } 100 assert bufferIndex == bufferCount; 101 } catch (Exception e) { 102 LOG.error("Buffer creation interrupted", e); 103 throw new IOException(e); 104 } 105 } finally { 106 pool.shutdownNow(); 107 } 108 } 109 110 @VisibleForTesting 111 static int getBufferSize(long capacity) { 112 int bufferSize = DEFAULT_BUFFER_SIZE; 113 if (bufferSize > (capacity / 16)) { 114 bufferSize = (int) roundUp(capacity / 16, 32768); 115 } 116 return bufferSize; 117 } 118 119 private static int getBufferCount(long capacity) { 120 int bufferSize = getBufferSize(capacity); 121 return (int) (roundUp(capacity, bufferSize) / bufferSize); 122 } 123 124 private static long roundUp(long n, long to) { 125 return ((n + to - 1) / to) * to; 126 } 127 128 /** 129 * Transfers bytes from this buffers array into the given destination {@link ByteBuff} 130 * @param offset start position in this big logical array. 131 * @param dst the destination ByteBuff. Notice that its position will be advanced. 132 * @return number of bytes read 133 */ 134 public int read(long offset, ByteBuff dst) { 135 return internalTransfer(offset, dst, READER); 136 } 137 138 /** 139 * Transfers bytes from the given source {@link ByteBuff} into this buffer array 140 * @param offset start offset of this big logical array. 141 * @param src the source ByteBuff. Notice that its position will be advanced. 142 * @return number of bytes write 143 */ 144 public int write(long offset, ByteBuff src) { 145 return internalTransfer(offset, src, WRITER); 146 } 147 148 /** 149 * Transfer bytes from source {@link ByteBuff} to destination {@link ByteBuffer}. Position of both 150 * source and destination will be advanced. 151 */ 152 private static final BiConsumer<ByteBuffer, ByteBuff> WRITER = (dst, src) -> { 153 int off = src.position(), len = dst.remaining(); 154 src.get(dst, off, len); 155 src.position(off + len); 156 }; 157 158 /** 159 * Transfer bytes from source {@link ByteBuffer} to destination {@link ByteBuff}, Position of both 160 * source and destination will be advanced. 161 */ 162 private static final BiConsumer<ByteBuffer, ByteBuff> READER = (src, dst) -> { 163 int off = dst.position(), len = src.remaining(), srcOff = src.position(); 164 dst.put(off, ByteBuff.wrap(src), srcOff, len); 165 src.position(srcOff + len); 166 dst.position(off + len); 167 }; 168 169 /** 170 * Transferring all remaining bytes from b to the buffers array starting at offset, or 171 * transferring bytes from the buffers array at offset to b until b is filled. Notice that 172 * position of ByteBuff b will be advanced. 173 * @param offset where we start in the big logical array. 174 * @param b the ByteBuff to transfer from or to 175 * @param transfer the transfer interface. 176 * @return the length of bytes we transferred. 177 */ 178 private int internalTransfer(long offset, ByteBuff b, BiConsumer<ByteBuffer, ByteBuff> transfer) { 179 int expectedTransferLen = b.remaining(); 180 if (expectedTransferLen == 0) { 181 return 0; 182 } 183 BufferIterator it = new BufferIterator(offset, expectedTransferLen); 184 while (it.hasNext()) { 185 ByteBuffer a = it.next(); 186 transfer.accept(a, b); 187 assert !a.hasRemaining(); 188 } 189 assert expectedTransferLen == it.getSum() : "Expected transfer length (=" + expectedTransferLen 190 + ") don't match the actual transfer length(=" + it.getSum() + ")"; 191 return expectedTransferLen; 192 } 193 194 /** 195 * Creates a sub-array from a given array of ByteBuffers from the given offset to the length 196 * specified. For eg, if there are 4 buffers forming an array each with length 10 and if we call 197 * asSubByteBuffers(5, 10) then we will create an sub-array consisting of two BBs and the first 198 * one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from 'position' 0 to 199 * 'length' 5. 200 * @param offset the position in the whole array which is composited by multiple byte buffers. 201 * @param len the length of bytes 202 * @return the underlying ByteBuffers, each ByteBuffer is a slice from the backend and will have a 203 * zero position. 204 */ 205 public ByteBuffer[] asSubByteBuffers(long offset, final int len) { 206 BufferIterator it = new BufferIterator(offset, len); 207 ByteBuffer[] mbb = new ByteBuffer[it.getBufferCount()]; 208 for (int i = 0; i < mbb.length; i++) { 209 assert it.hasNext(); 210 mbb[i] = it.next(); 211 } 212 assert it.getSum() == len; 213 return mbb; 214 } 215 216 /** 217 * Iterator to fetch ByteBuffers from offset with given length in this big logical array. 218 */ 219 private class BufferIterator implements Iterator<ByteBuffer> { 220 private final int len; 221 private int startBuffer, startOffset, endBuffer, endOffset; 222 private int curIndex, sum = 0; 223 224 private int index(long pos) { 225 return (int) (pos / bufferSize); 226 } 227 228 private int offset(long pos) { 229 return (int) (pos % bufferSize); 230 } 231 232 public BufferIterator(long offset, int len) { 233 assert len >= 0 && offset >= 0; 234 this.len = len; 235 236 this.startBuffer = index(offset); 237 this.startOffset = offset(offset); 238 239 this.endBuffer = index(offset + len); 240 this.endOffset = offset(offset + len); 241 if (startBuffer < endBuffer && endOffset == 0) { 242 endBuffer--; 243 endOffset = bufferSize; 244 } 245 assert startBuffer >= 0 && startBuffer < bufferCount; 246 assert endBuffer >= 0 && endBuffer < bufferCount; 247 248 // initialize the index to the first buffer index. 249 this.curIndex = startBuffer; 250 } 251 252 @Override 253 public boolean hasNext() { 254 return this.curIndex <= endBuffer; 255 } 256 257 /** 258 * The returned ByteBuffer is an sliced one, it won't affect the position or limit of the 259 * original one. 260 */ 261 @Override 262 public ByteBuffer next() { 263 ByteBuffer bb = buffers[curIndex].duplicate(); 264 if (curIndex == startBuffer) { 265 bb.position(startOffset).limit(Math.min(bufferSize, startOffset + len)); 266 } else if (curIndex == endBuffer) { 267 bb.position(0).limit(endOffset); 268 } else { 269 bb.position(0).limit(bufferSize); 270 } 271 curIndex++; 272 sum += bb.remaining(); 273 // Make sure that its pos is zero, it's important because MBB will count from zero for all nio 274 // ByteBuffers. 275 return bb.slice(); 276 } 277 278 int getSum() { 279 return sum; 280 } 281 282 int getBufferCount() { 283 return this.endBuffer - this.startBuffer + 1; 284 } 285 } 286}