001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one or more 005 * contributor license agreements. See the NOTICE file distributed with this 006 * work for additional information regarding copyright ownership. The ASF 007 * licenses this file to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 016 * License for the specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import java.io.IOException; 022import java.nio.ByteBuffer; 023import java.util.concurrent.Callable; 024import java.util.concurrent.ExecutionException; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.Future; 027import java.util.concurrent.LinkedBlockingQueue; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.hbase.nio.ByteBuff; 031import org.apache.hadoop.hbase.nio.MultiByteBuff; 032import org.apache.hadoop.hbase.nio.SingleByteBuff; 033import org.apache.hadoop.util.StringUtils; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 039 040/** 041 * This class manages an array of ByteBuffers with a default size 4MB. These 042 * buffers are sequential and could be considered as a large buffer.It supports 043 * reading/writing data from this large buffer with a position and offset 044 */ 045@InterfaceAudience.Private 046public class ByteBufferArray { 047 private static final Logger LOG = LoggerFactory.getLogger(ByteBufferArray.class); 048 049 public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; 050 @VisibleForTesting 051 ByteBuffer buffers[]; 052 private int bufferSize; 053 @VisibleForTesting 054 int bufferCount; 055 056 /** 057 * We allocate a number of byte buffers as the capacity. In order not to out 058 * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}), 059 * we will allocate one additional buffer with capacity 0; 060 * @param capacity total size of the byte buffer array 061 * @param allocator the ByteBufferAllocator that will create the buffers 062 * @throws IOException throws IOException if there is an exception thrown by the allocator 063 */ 064 public ByteBufferArray(long capacity, ByteBufferAllocator allocator) 065 throws IOException { 066 this.bufferSize = DEFAULT_BUFFER_SIZE; 067 if (this.bufferSize > (capacity / 16)) 068 this.bufferSize = (int) roundUp(capacity / 16, 32768); 069 this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize); 070 LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity) 071 + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count=" 072 + bufferCount); 073 buffers = new ByteBuffer[bufferCount + 1]; 074 createBuffers(allocator); 075 } 076 077 @VisibleForTesting 078 void createBuffers(ByteBufferAllocator allocator) 079 throws IOException { 080 int threadCount = getThreadCount(); 081 ExecutorService service = new ThreadPoolExecutor(threadCount, threadCount, 0L, 082 TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); 083 int perThreadCount = (int)Math.floor((double) (bufferCount) / threadCount); 084 int lastThreadCount = bufferCount - (perThreadCount * (threadCount - 1)); 085 Future<ByteBuffer[]>[] futures = new Future[threadCount]; 086 try { 087 for (int i = 0; i < threadCount; i++) { 088 // Last thread will have to deal with a different number of buffers 089 int buffersToCreate = (i == threadCount - 1) ? lastThreadCount : perThreadCount; 090 futures[i] = service.submit( 091 new BufferCreatorCallable(bufferSize, buffersToCreate, allocator)); 092 } 093 int bufferIndex = 0; 094 for (Future<ByteBuffer[]> future : futures) { 095 try { 096 ByteBuffer[] buffers = future.get(); 097 for (ByteBuffer buffer : buffers) { 098 this.buffers[bufferIndex++] = buffer; 099 } 100 } catch (InterruptedException | ExecutionException e) { 101 LOG.error("Buffer creation interrupted", e); 102 throw new IOException(e); 103 } 104 } 105 } finally { 106 service.shutdownNow(); 107 } 108 // always create on heap empty dummy buffer at last 109 this.buffers[bufferCount] = ByteBuffer.allocate(0); 110 } 111 112 @VisibleForTesting 113 int getThreadCount() { 114 return Runtime.getRuntime().availableProcessors(); 115 } 116 117 /** 118 * A callable that creates buffers of the specified length either onheap/offheap using the 119 * {@link ByteBufferAllocator} 120 */ 121 private static class BufferCreatorCallable implements Callable<ByteBuffer[]> { 122 private final int bufferCapacity; 123 private final int bufferCount; 124 private final ByteBufferAllocator allocator; 125 126 BufferCreatorCallable(int bufferCapacity, int bufferCount, ByteBufferAllocator allocator) { 127 this.bufferCapacity = bufferCapacity; 128 this.bufferCount = bufferCount; 129 this.allocator = allocator; 130 } 131 132 @Override 133 public ByteBuffer[] call() throws Exception { 134 ByteBuffer[] buffers = new ByteBuffer[this.bufferCount]; 135 for (int i = 0; i < this.bufferCount; i++) { 136 buffers[i] = allocator.allocate(this.bufferCapacity); 137 } 138 return buffers; 139 } 140 } 141 142 private long roundUp(long n, long to) { 143 return ((n + to - 1) / to) * to; 144 } 145 146 /** 147 * Transfers bytes from this buffer array into the given destination array 148 * @param start start position in the ByteBufferArray 149 * @param len The maximum number of bytes to be written to the given array 150 * @param dstArray The array into which bytes are to be written 151 * @return number of bytes read 152 */ 153 public int getMultiple(long start, int len, byte[] dstArray) { 154 return getMultiple(start, len, dstArray, 0); 155 } 156 157 /** 158 * Transfers bytes from this buffer array into the given destination array 159 * @param start start offset of this buffer array 160 * @param len The maximum number of bytes to be written to the given array 161 * @param dstArray The array into which bytes are to be written 162 * @param dstOffset The offset within the given array of the first byte to be 163 * written 164 * @return number of bytes read 165 */ 166 public int getMultiple(long start, int len, byte[] dstArray, int dstOffset) { 167 multiple(start, len, dstArray, dstOffset, GET_MULTIPLE_VISTOR); 168 return len; 169 } 170 171 private final static Visitor GET_MULTIPLE_VISTOR = new Visitor() { 172 @Override 173 public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) { 174 ByteBufferUtils.copyFromBufferToArray(array, bb, pos, arrayIdx, len); 175 } 176 }; 177 178 /** 179 * Transfers bytes from the given source array into this buffer array 180 * @param start start offset of this buffer array 181 * @param len The maximum number of bytes to be read from the given array 182 * @param srcArray The array from which bytes are to be read 183 */ 184 public void putMultiple(long start, int len, byte[] srcArray) { 185 putMultiple(start, len, srcArray, 0); 186 } 187 188 /** 189 * Transfers bytes from the given source array into this buffer array 190 * @param start start offset of this buffer array 191 * @param len The maximum number of bytes to be read from the given array 192 * @param srcArray The array from which bytes are to be read 193 * @param srcOffset The offset within the given array of the first byte to be 194 * read 195 */ 196 public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) { 197 multiple(start, len, srcArray, srcOffset, PUT_MULTIPLE_VISITOR); 198 } 199 200 private final static Visitor PUT_MULTIPLE_VISITOR = new Visitor() { 201 @Override 202 public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) { 203 ByteBufferUtils.copyFromArrayToBuffer(bb, pos, array, arrayIdx, len); 204 } 205 }; 206 207 private interface Visitor { 208 /** 209 * Visit the given byte buffer, if it is a read action, we will transfer the 210 * bytes from the buffer to the destination array, else if it is a write 211 * action, we will transfer the bytes from the source array to the buffer 212 * @param bb byte buffer 213 * @param pos Start position in ByteBuffer 214 * @param array a source or destination byte array 215 * @param arrayOffset offset of the byte array 216 * @param len read/write length 217 */ 218 void visit(ByteBuffer bb, int pos, byte[] array, int arrayOffset, int len); 219 } 220 221 /** 222 * Access(read or write) this buffer array with a position and length as the 223 * given array. Here we will only lock one buffer even if it may be need visit 224 * several buffers. The consistency is guaranteed by the caller. 225 * @param start start offset of this buffer array 226 * @param len The maximum number of bytes to be accessed 227 * @param array The array from/to which bytes are to be read/written 228 * @param arrayOffset The offset within the given array of the first byte to 229 * be read or written 230 * @param visitor implement of how to visit the byte buffer 231 */ 232 void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) { 233 assert len >= 0; 234 long end = start + len; 235 int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize); 236 int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize); 237 assert array.length >= len + arrayOffset; 238 assert startBuffer >= 0 && startBuffer < bufferCount; 239 assert (endBuffer >= 0 && endBuffer < bufferCount) 240 || (endBuffer == bufferCount && endOffset == 0); 241 if (startBuffer >= buffers.length || startBuffer < 0) { 242 String msg = "Failed multiple, start=" + start + ",startBuffer=" 243 + startBuffer + ",bufferSize=" + bufferSize; 244 LOG.error(msg); 245 throw new RuntimeException(msg); 246 } 247 int srcIndex = 0, cnt = -1; 248 for (int i = startBuffer; i <= endBuffer; ++i) { 249 ByteBuffer bb = buffers[i].duplicate(); 250 int pos = 0; 251 if (i == startBuffer) { 252 cnt = bufferSize - startOffset; 253 if (cnt > len) cnt = len; 254 pos = startOffset; 255 } else if (i == endBuffer) { 256 cnt = endOffset; 257 } else { 258 cnt = bufferSize; 259 } 260 visitor.visit(bb, pos, array, srcIndex + arrayOffset, cnt); 261 srcIndex += cnt; 262 } 263 assert srcIndex == len; 264 } 265 266 /** 267 * Creates a ByteBuff from a given array of ByteBuffers from the given offset to the 268 * length specified. For eg, if there are 4 buffers forming an array each with length 10 and 269 * if we call asSubBuffer(5, 10) then we will create an MBB consisting of two BBs 270 * and the first one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from 271 * 'position' 0 to 'length' 5. 272 * @param offset 273 * @param len 274 * @return a ByteBuff formed from the underlying ByteBuffers 275 */ 276 public ByteBuff asSubByteBuff(long offset, int len) { 277 assert len >= 0; 278 long end = offset + len; 279 int startBuffer = (int) (offset / bufferSize), startBufferOffset = (int) (offset % bufferSize); 280 int endBuffer = (int) (end / bufferSize), endBufferOffset = (int) (end % bufferSize); 281 // Last buffer in the array is a dummy one with 0 capacity. Avoid sending back that 282 if (endBuffer == this.bufferCount) { 283 endBuffer--; 284 endBufferOffset = bufferSize; 285 } 286 assert startBuffer >= 0 && startBuffer < bufferCount; 287 assert (endBuffer >= 0 && endBuffer < bufferCount) 288 || (endBuffer == bufferCount && endBufferOffset == 0); 289 if (startBuffer >= buffers.length || startBuffer < 0) { 290 String msg = "Failed subArray, start=" + offset + ",startBuffer=" + startBuffer 291 + ",bufferSize=" + bufferSize; 292 LOG.error(msg); 293 throw new RuntimeException(msg); 294 } 295 int srcIndex = 0, cnt = -1; 296 ByteBuffer[] mbb = new ByteBuffer[endBuffer - startBuffer + 1]; 297 for (int i = startBuffer, j = 0; i <= endBuffer; ++i, j++) { 298 ByteBuffer bb = buffers[i].duplicate(); 299 if (i == startBuffer) { 300 cnt = bufferSize - startBufferOffset; 301 if (cnt > len) cnt = len; 302 bb.limit(startBufferOffset + cnt).position(startBufferOffset); 303 } else if (i == endBuffer) { 304 cnt = endBufferOffset; 305 bb.position(0).limit(cnt); 306 } else { 307 cnt = bufferSize; 308 bb.position(0).limit(cnt); 309 } 310 mbb[j] = bb.slice(); 311 srcIndex += cnt; 312 } 313 assert srcIndex == len; 314 if (mbb.length > 1) { 315 return new MultiByteBuff(mbb); 316 } else { 317 return new SingleByteBuff(mbb[0]); 318 } 319 } 320}