001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one or more
005 * contributor license agreements. See the NOTICE file distributed with this
006 * work for additional information regarding copyright ownership. The ASF
007 * licenses this file to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
016 * License for the specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import java.io.IOException;
022import java.nio.ByteBuffer;
023import java.util.concurrent.Callable;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.Future;
027import java.util.concurrent.LinkedBlockingQueue;
028import java.util.concurrent.ThreadPoolExecutor;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.hbase.nio.ByteBuff;
031import org.apache.hadoop.hbase.nio.MultiByteBuff;
032import org.apache.hadoop.hbase.nio.SingleByteBuff;
033import org.apache.hadoop.util.StringUtils;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
039
040/**
041 * This class manages an array of ByteBuffers with a default size 4MB. These
042 * buffers are sequential and could be considered as a large buffer.It supports
043 * reading/writing data from this large buffer with a position and offset
044 */
045@InterfaceAudience.Private
046public class ByteBufferArray {
047  private static final Logger LOG = LoggerFactory.getLogger(ByteBufferArray.class);
048
049  public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
050  @VisibleForTesting
051  ByteBuffer buffers[];
052  private int bufferSize;
053  @VisibleForTesting
054  int bufferCount;
055
056  /**
057   * We allocate a number of byte buffers as the capacity. In order not to out
058   * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
059   * we will allocate one additional buffer with capacity 0;
060   * @param capacity total size of the byte buffer array
061   * @param allocator the ByteBufferAllocator that will create the buffers
062   * @throws IOException throws IOException if there is an exception thrown by the allocator
063   */
064  public ByteBufferArray(long capacity, ByteBufferAllocator allocator)
065      throws IOException {
066    this.bufferSize = DEFAULT_BUFFER_SIZE;
067    if (this.bufferSize > (capacity / 16))
068      this.bufferSize = (int) roundUp(capacity / 16, 32768);
069    this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
070    LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
071        + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
072        + bufferCount);
073    buffers = new ByteBuffer[bufferCount + 1];
074    createBuffers(allocator);
075  }
076
077  @VisibleForTesting
078  void createBuffers(ByteBufferAllocator allocator)
079      throws IOException {
080    int threadCount = getThreadCount();
081    ExecutorService service = new ThreadPoolExecutor(threadCount, threadCount, 0L,
082        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
083    int perThreadCount = (int)Math.floor((double) (bufferCount) / threadCount);
084    int lastThreadCount = bufferCount - (perThreadCount * (threadCount - 1));
085    Future<ByteBuffer[]>[] futures = new Future[threadCount];
086    try {
087      for (int i = 0; i < threadCount; i++) {
088        // Last thread will have to deal with a different number of buffers
089        int buffersToCreate = (i == threadCount - 1) ? lastThreadCount : perThreadCount;
090        futures[i] = service.submit(
091          new BufferCreatorCallable(bufferSize, buffersToCreate, allocator));
092      }
093      int bufferIndex = 0;
094      for (Future<ByteBuffer[]> future : futures) {
095        try {
096          ByteBuffer[] buffers = future.get();
097          for (ByteBuffer buffer : buffers) {
098            this.buffers[bufferIndex++] = buffer;
099          }
100        } catch (InterruptedException | ExecutionException e) {
101          LOG.error("Buffer creation interrupted", e);
102          throw new IOException(e);
103        }
104      }
105    } finally {
106      service.shutdownNow();
107    }
108    // always create on heap empty dummy buffer at last
109    this.buffers[bufferCount] = ByteBuffer.allocate(0);
110  }
111
112  @VisibleForTesting
113  int getThreadCount() {
114    return Runtime.getRuntime().availableProcessors();
115  }
116
117  /**
118   * A callable that creates buffers of the specified length either onheap/offheap using the
119   * {@link ByteBufferAllocator}
120   */
121  private static class BufferCreatorCallable implements Callable<ByteBuffer[]> {
122    private final int bufferCapacity;
123    private final int bufferCount;
124    private final ByteBufferAllocator allocator;
125
126    BufferCreatorCallable(int bufferCapacity, int bufferCount, ByteBufferAllocator allocator) {
127      this.bufferCapacity = bufferCapacity;
128      this.bufferCount = bufferCount;
129      this.allocator = allocator;
130    }
131
132    @Override
133    public ByteBuffer[] call() throws Exception {
134      ByteBuffer[] buffers = new ByteBuffer[this.bufferCount];
135      for (int i = 0; i < this.bufferCount; i++) {
136        buffers[i] = allocator.allocate(this.bufferCapacity);
137      }
138      return buffers;
139    }
140  }
141
142  private long roundUp(long n, long to) {
143    return ((n + to - 1) / to) * to;
144  }
145
146  /**
147   * Transfers bytes from this buffer array into the given destination array
148   * @param start start position in the ByteBufferArray
149   * @param len The maximum number of bytes to be written to the given array
150   * @param dstArray The array into which bytes are to be written
151   * @return number of bytes read
152   */
153  public int getMultiple(long start, int len, byte[] dstArray) {
154    return getMultiple(start, len, dstArray, 0);
155  }
156
157  /**
158   * Transfers bytes from this buffer array into the given destination array
159   * @param start start offset of this buffer array
160   * @param len The maximum number of bytes to be written to the given array
161   * @param dstArray The array into which bytes are to be written
162   * @param dstOffset The offset within the given array of the first byte to be
163   *          written
164   * @return number of bytes read
165   */
166  public int getMultiple(long start, int len, byte[] dstArray, int dstOffset) {
167    multiple(start, len, dstArray, dstOffset, GET_MULTIPLE_VISTOR);
168    return len;
169  }
170
171  private final static Visitor GET_MULTIPLE_VISTOR = new Visitor() {
172    @Override
173    public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) {
174      ByteBufferUtils.copyFromBufferToArray(array, bb, pos, arrayIdx, len);
175    }
176  };
177
178  /**
179   * Transfers bytes from the given source array into this buffer array
180   * @param start start offset of this buffer array
181   * @param len The maximum number of bytes to be read from the given array
182   * @param srcArray The array from which bytes are to be read
183   */
184  public void putMultiple(long start, int len, byte[] srcArray) {
185    putMultiple(start, len, srcArray, 0);
186  }
187
188  /**
189   * Transfers bytes from the given source array into this buffer array
190   * @param start start offset of this buffer array
191   * @param len The maximum number of bytes to be read from the given array
192   * @param srcArray The array from which bytes are to be read
193   * @param srcOffset The offset within the given array of the first byte to be
194   *          read
195   */
196  public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) {
197    multiple(start, len, srcArray, srcOffset, PUT_MULTIPLE_VISITOR);
198  }
199
200  private final static Visitor PUT_MULTIPLE_VISITOR = new Visitor() {
201    @Override
202    public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) {
203      ByteBufferUtils.copyFromArrayToBuffer(bb, pos, array, arrayIdx, len);
204    }
205  };
206
207  private interface Visitor {
208    /**
209     * Visit the given byte buffer, if it is a read action, we will transfer the
210     * bytes from the buffer to the destination array, else if it is a write
211     * action, we will transfer the bytes from the source array to the buffer
212     * @param bb byte buffer
213     * @param pos Start position in ByteBuffer
214     * @param array a source or destination byte array
215     * @param arrayOffset offset of the byte array
216     * @param len read/write length
217     */
218    void visit(ByteBuffer bb, int pos, byte[] array, int arrayOffset, int len);
219  }
220
221  /**
222   * Access(read or write) this buffer array with a position and length as the
223   * given array. Here we will only lock one buffer even if it may be need visit
224   * several buffers. The consistency is guaranteed by the caller.
225   * @param start start offset of this buffer array
226   * @param len The maximum number of bytes to be accessed
227   * @param array The array from/to which bytes are to be read/written
228   * @param arrayOffset The offset within the given array of the first byte to
229   *          be read or written
230   * @param visitor implement of how to visit the byte buffer
231   */
232  void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) {
233    assert len >= 0;
234    long end = start + len;
235    int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize);
236    int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize);
237    assert array.length >= len + arrayOffset;
238    assert startBuffer >= 0 && startBuffer < bufferCount;
239    assert (endBuffer >= 0 && endBuffer < bufferCount)
240        || (endBuffer == bufferCount && endOffset == 0);
241    if (startBuffer >= buffers.length || startBuffer < 0) {
242      String msg = "Failed multiple, start=" + start + ",startBuffer="
243          + startBuffer + ",bufferSize=" + bufferSize;
244      LOG.error(msg);
245      throw new RuntimeException(msg);
246    }
247    int srcIndex = 0, cnt = -1;
248    for (int i = startBuffer; i <= endBuffer; ++i) {
249      ByteBuffer bb = buffers[i].duplicate();
250      int pos = 0;
251      if (i == startBuffer) {
252        cnt = bufferSize - startOffset;
253        if (cnt > len) cnt = len;
254        pos = startOffset;
255      } else if (i == endBuffer) {
256        cnt = endOffset;
257      } else {
258        cnt = bufferSize;
259      }
260      visitor.visit(bb, pos, array, srcIndex + arrayOffset, cnt);
261      srcIndex += cnt;
262    }
263    assert srcIndex == len;
264  }
265
266  /**
267   * Creates a ByteBuff from a given array of ByteBuffers from the given offset to the
268   * length specified. For eg, if there are 4 buffers forming an array each with length 10 and
269   * if we call asSubBuffer(5, 10) then we will create an MBB consisting of two BBs
270   * and the first one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from
271   * 'position' 0 to 'length' 5.
272   * @param offset
273   * @param len
274   * @return a ByteBuff formed from the underlying ByteBuffers
275   */
276  public ByteBuff asSubByteBuff(long offset, int len) {
277    assert len >= 0;
278    long end = offset + len;
279    int startBuffer = (int) (offset / bufferSize), startBufferOffset = (int) (offset % bufferSize);
280    int endBuffer = (int) (end / bufferSize), endBufferOffset = (int) (end % bufferSize);
281    // Last buffer in the array is a dummy one with 0 capacity. Avoid sending back that
282    if (endBuffer == this.bufferCount) {
283      endBuffer--;
284      endBufferOffset = bufferSize;
285    }
286    assert startBuffer >= 0 && startBuffer < bufferCount;
287    assert (endBuffer >= 0 && endBuffer < bufferCount)
288        || (endBuffer == bufferCount && endBufferOffset == 0);
289    if (startBuffer >= buffers.length || startBuffer < 0) {
290      String msg = "Failed subArray, start=" + offset + ",startBuffer=" + startBuffer
291          + ",bufferSize=" + bufferSize;
292      LOG.error(msg);
293      throw new RuntimeException(msg);
294    }
295    int srcIndex = 0, cnt = -1;
296    ByteBuffer[] mbb = new ByteBuffer[endBuffer - startBuffer + 1];
297    for (int i = startBuffer, j = 0; i <= endBuffer; ++i, j++) {
298      ByteBuffer bb = buffers[i].duplicate();
299      if (i == startBuffer) {
300        cnt = bufferSize - startBufferOffset;
301        if (cnt > len) cnt = len;
302        bb.limit(startBufferOffset + cnt).position(startBufferOffset);
303      } else if (i == endBuffer) {
304        cnt = endBufferOffset;
305        bb.position(0).limit(cnt);
306      } else {
307        cnt = bufferSize;
308        bb.position(0).limit(cnt);
309      }
310      mbb[j] = bb.slice();
311      srcIndex += cnt;
312    }
313    assert srcIndex == len;
314    if (mbb.length > 1) {
315      return new MultiByteBuff(mbb);
316    } else {
317      return new SingleByteBuff(mbb[0]);
318    }
319  }
320}