View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.nio.ByteBuff;
28  import org.apache.hadoop.hbase.nio.MultiByteBuff;
29  import org.apache.hadoop.hbase.nio.SingleByteBuff;
30  import org.apache.hadoop.util.StringUtils;
31
32  /**
33   * This class manages an array of ByteBuffers with a default size 4MB. These
34   * buffers are sequential and could be considered as a large buffer.It supports
35   * reading/writing data from this large buffer with a position and offset
36   */
37  @InterfaceAudience.Private
38  public final class ByteBufferArray {
39    private static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
40
41    public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
42    private ByteBuffer buffers[];
43    private int bufferSize;
44    private int bufferCount;
45
46    /**
47     * We allocate a number of byte buffers as the capacity. In order not to out
48     * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
49     * we will allocate one additional buffer with capacity 0;
50     * @param capacity total size of the byte buffer array
51     * @param directByteBuffer true if we allocate direct buffer
52     * @param allocator the ByteBufferAllocator that will create the buffers
53     * @throws IOException throws IOException if there is an exception thrown by the allocator
54     */
55    public ByteBufferArray(long capacity, boolean directByteBuffer, ByteBufferAllocator allocator)
56        throws IOException {
57      this.bufferSize = DEFAULT_BUFFER_SIZE;
58      if (this.bufferSize > (capacity / 16))
59        this.bufferSize = (int) roundUp(capacity / 16, 32768);
60      this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
61      LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
62          + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
63          + bufferCount + ", direct=" + directByteBuffer);
64      buffers = new ByteBuffer[bufferCount + 1];
65      for (int i = 0; i <= bufferCount; i++) {
66        if (i < bufferCount) {
67          buffers[i] = allocator.allocate(bufferSize, directByteBuffer);
68        } else {
69          // always create on heap
70          buffers[i] = ByteBuffer.allocate(0);
71        }
72      }
73    }
74
75    private long roundUp(long n, long to) {
76      return ((n + to - 1) / to) * to;
77    }
78
79    /**
80     * Transfers bytes from this buffer array into the given destination array
81     * @param start start position in the ByteBufferArray
82     * @param len The maximum number of bytes to be written to the given array
83     * @param dstArray The array into which bytes are to be written
84     * @return number of bytes read
85     */
86    public int getMultiple(long start, int len, byte[] dstArray) {
87      return getMultiple(start, len, dstArray, 0);
88    }
89
90    /**
91     * Transfers bytes from this buffer array into the given destination array
92     * @param start start offset of this buffer array
93     * @param len The maximum number of bytes to be written to the given array
94     * @param dstArray The array into which bytes are to be written
95     * @param dstOffset The offset within the given array of the first byte to be
96     *          written
97     * @return number of bytes read
98     */
99    public int getMultiple(long start, int len, byte[] dstArray, int dstOffset) {
100     multiple(start, len, dstArray, dstOffset, GET_MULTIPLE_VISTOR);
101     return len;
102   }
103
104   private final static Visitor GET_MULTIPLE_VISTOR = new Visitor() {
105     @Override
106     public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) {
107       ByteBufferUtils.copyFromBufferToArray(array, bb, pos, arrayIdx, len);
108     }
109   };
110
111   /**
112    * Transfers bytes from the given source array into this buffer array
113    * @param start start offset of this buffer array
114    * @param len The maximum number of bytes to be read from the given array
115    * @param srcArray The array from which bytes are to be read
116    */
117   public void putMultiple(long start, int len, byte[] srcArray) {
118     putMultiple(start, len, srcArray, 0);
119   }
120
121   /**
122    * Transfers bytes from the given source array into this buffer array
123    * @param start start offset of this buffer array
124    * @param len The maximum number of bytes to be read from the given array
125    * @param srcArray The array from which bytes are to be read
126    * @param srcOffset The offset within the given array of the first byte to be
127    *          read
128    */
129   public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) {
130     multiple(start, len, srcArray, srcOffset, PUT_MULTIPLE_VISITOR);
131   }
132
133   private final static Visitor PUT_MULTIPLE_VISITOR = new Visitor() {
134     @Override
135     public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) {
136       ByteBufferUtils.copyFromArrayToBuffer(bb, pos, array, arrayIdx, len);
137     }
138   };
139
140   private interface Visitor {
141     /**
142      * Visit the given byte buffer, if it is a read action, we will transfer the
143      * bytes from the buffer to the destination array, else if it is a write
144      * action, we will transfer the bytes from the source array to the buffer
145      * @param bb byte buffer
146      * @param pos Start position in ByteBuffer
147      * @param array a source or destination byte array
148      * @param arrayOffset offset of the byte array
149      * @param len read/write length
150      */
151     void visit(ByteBuffer bb, int pos, byte[] array, int arrayOffset, int len);
152   }
153
154   /**
155    * Access(read or write) this buffer array with a position and length as the
156    * given array. Here we will only lock one buffer even if it may be need visit
157    * several buffers. The consistency is guaranteed by the caller.
158    * @param start start offset of this buffer array
159    * @param len The maximum number of bytes to be accessed
160    * @param array The array from/to which bytes are to be read/written
161    * @param arrayOffset The offset within the given array of the first byte to
162    *          be read or written
163    * @param visitor implement of how to visit the byte buffer
164    */
165   void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) {
166     assert len >= 0;
167     long end = start + len;
168     int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize);
169     int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize);
170     assert array.length >= len + arrayOffset;
171     assert startBuffer >= 0 && startBuffer < bufferCount;
172     assert endBuffer >= 0 && endBuffer < bufferCount
173         || (endBuffer == bufferCount && endOffset == 0);
174     if (startBuffer >= buffers.length || startBuffer < 0) {
175       String msg = "Failed multiple, start=" + start + ",startBuffer="
176           + startBuffer + ",bufferSize=" + bufferSize;
177       LOG.error(msg);
178       throw new RuntimeException(msg);
179     }
180     int srcIndex = 0, cnt = -1;
181     for (int i = startBuffer; i <= endBuffer; ++i) {
182       ByteBuffer bb = buffers[i].duplicate();
183       int pos = 0;
184       if (i == startBuffer) {
185         cnt = bufferSize - startOffset;
186         if (cnt > len) cnt = len;
187         pos = startOffset;
188       } else if (i == endBuffer) {
189         cnt = endOffset;
190       } else {
191         cnt = bufferSize;
192       }
193       visitor.visit(bb, pos, array, srcIndex + arrayOffset, cnt);
194       srcIndex += cnt;
195     }
196     assert srcIndex == len;
197   }
198
199   /**
200    * Creates a ByteBuff from a given array of ByteBuffers from the given offset to the
201    * length specified. For eg, if there are 4 buffers forming an array each with length 10 and
202    * if we call asSubBuffer(5, 10) then we will create an MBB consisting of two BBs
203    * and the first one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from
204    * 'position' 0 to 'length' 5.
205    * @param offset
206    * @param len
207    * @return a ByteBuff formed from the underlying ByteBuffers
208    */
209   public ByteBuff asSubByteBuff(long offset, int len) {
210     assert len >= 0;
211     long end = offset + len;
212     int startBuffer = (int) (offset / bufferSize), startBufferOffset = (int) (offset % bufferSize);
213     int endBuffer = (int) (end / bufferSize), endBufferOffset = (int) (end % bufferSize);
214     // Last buffer in the array is a dummy one with 0 capacity. Avoid sending back that
215     if (endBuffer == this.bufferCount) {
216       endBuffer--;
217       endBufferOffset = bufferSize;
218     }
219     assert startBuffer >= 0 && startBuffer < bufferCount;
220     assert endBuffer >= 0 && endBuffer < bufferCount
221         || (endBuffer == bufferCount && endBufferOffset == 0);
222     if (startBuffer >= buffers.length || startBuffer < 0) {
223       String msg = "Failed subArray, start=" + offset + ",startBuffer=" + startBuffer
224           + ",bufferSize=" + bufferSize;
225       LOG.error(msg);
226       throw new RuntimeException(msg);
227     }
228     int srcIndex = 0, cnt = -1;
229     ByteBuffer[] mbb = new ByteBuffer[endBuffer - startBuffer + 1];
230     for (int i = startBuffer, j = 0; i <= endBuffer; ++i, j++) {
231       ByteBuffer bb = buffers[i].duplicate();
232       if (i == startBuffer) {
233         cnt = bufferSize - startBufferOffset;
234         if (cnt > len) cnt = len;
235         bb.limit(startBufferOffset + cnt).position(startBufferOffset);
236       } else if (i == endBuffer) {
237         cnt = endBufferOffset;
238         bb.position(0).limit(cnt);
239       } else {
240         cnt = bufferSize;
241         bb.position(0).limit(cnt);
242       }
243       mbb[j] = bb.slice();
244       srcIndex += cnt;
245     }
246     assert srcIndex == len;
247     if (mbb.length > 1) {
248       return new MultiByteBuff(mbb);
249     } else {
250       return new SingleByteBuff(mbb[0]);
251     }
252   }
253 }