View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.nio.ByteBuffer;
22  import java.util.concurrent.locks.Lock;
23  import java.util.concurrent.locks.ReentrantLock;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.nio.ByteBuff;
29  import org.apache.hadoop.hbase.nio.MultiByteBuff;
30  import org.apache.hadoop.hbase.nio.SingleByteBuff;
31  import org.apache.hadoop.util.StringUtils;
32  
33  /**
34   * This class manages an array of ByteBuffers with a default size 4MB. These
35   * buffers are sequential and could be considered as a large buffer.It supports
36   * reading/writing data from this large buffer with a position and offset
37   */
38  @InterfaceAudience.Private
39  public final class ByteBufferArray {
40    private static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
41  
42    static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
43    private ByteBuffer buffers[];
44    private Lock locks[];
45    private int bufferSize;
46    private int bufferCount;
47  
48    /**
49     * We allocate a number of byte buffers as the capacity. In order not to out
50     * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
51     * we will allocate one additional buffer with capacity 0;
52     * @param capacity total size of the byte buffer array
53     * @param directByteBuffer true if we allocate direct buffer
54     */
55    public ByteBufferArray(long capacity, boolean directByteBuffer) {
56      this.bufferSize = DEFAULT_BUFFER_SIZE;
57      if (this.bufferSize > (capacity / 16))
58        this.bufferSize = (int) roundUp(capacity / 16, 32768);
59      this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
60      LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
61          + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
62          + bufferCount + ", direct=" + directByteBuffer);
63      buffers = new ByteBuffer[bufferCount + 1];
64      locks = new Lock[bufferCount + 1];
65      for (int i = 0; i <= bufferCount; i++) {
66        locks[i] = new ReentrantLock();
67        if (i < bufferCount) {
68          buffers[i] = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize)
69              : ByteBuffer.allocate(bufferSize);
70        } else {
71          buffers[i] = ByteBuffer.allocate(0);
72        }
73  
74      }
75    }
76  
77    private long roundUp(long n, long to) {
78      return ((n + to - 1) / to) * to;
79    }
80  
81    /**
82     * Transfers bytes from this buffer array into the given destination array
83     * @param start start position in the ByteBufferArray
84     * @param len The maximum number of bytes to be written to the given array
85     * @param dstArray The array into which bytes are to be written
86     * @return number of bytes read
87     */
88    public int getMultiple(long start, int len, byte[] dstArray) {
89      return getMultiple(start, len, dstArray, 0);
90    }
91  
92    /**
93     * Transfers bytes from this buffer array into the given destination array
94     * @param start start offset of this buffer array
95     * @param len The maximum number of bytes to be written to the given array
96     * @param dstArray The array into which bytes are to be written
97     * @param dstOffset The offset within the given array of the first byte to be
98     *          written
99     * @return number of bytes read
100    */
101   public int getMultiple(long start, int len, byte[] dstArray, int dstOffset) {
102     multiple(start, len, dstArray, dstOffset, GET_MULTIPLE_VISTOR);
103     return len;
104   }
105 
106   private final static Visitor GET_MULTIPLE_VISTOR = new Visitor() {
107     @Override
108     public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
109       bb.get(array, arrayIdx, len);
110     }
111   };
112 
113   /**
114    * Transfers bytes from the given source array into this buffer array
115    * @param start start offset of this buffer array
116    * @param len The maximum number of bytes to be read from the given array
117    * @param srcArray The array from which bytes are to be read
118    */
119   public void putMultiple(long start, int len, byte[] srcArray) {
120     putMultiple(start, len, srcArray, 0);
121   }
122 
123   /**
124    * Transfers bytes from the given source array into this buffer array
125    * @param start start offset of this buffer array
126    * @param len The maximum number of bytes to be read from the given array
127    * @param srcArray The array from which bytes are to be read
128    * @param srcOffset The offset within the given array of the first byte to be
129    *          read
130    */
131   public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) {
132     multiple(start, len, srcArray, srcOffset, PUT_MULTIPLE_VISITOR);
133   }
134 
135   private final static Visitor PUT_MULTIPLE_VISITOR = new Visitor() {
136     @Override
137     public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
138       bb.put(array, arrayIdx, len);
139     }
140   };
141 
142   private interface Visitor {
143     /**
144      * Visit the given byte buffer, if it is a read action, we will transfer the
145      * bytes from the buffer to the destination array, else if it is a write
146      * action, we will transfer the bytes from the source array to the buffer
147      * @param bb byte buffer
148      * @param array a source or destination byte array
149      * @param arrayOffset offset of the byte array
150      * @param len read/write length
151      */
152     void visit(ByteBuffer bb, byte[] array, int arrayOffset, int len);
153   }
154 
155   /**
156    * Access(read or write) this buffer array with a position and length as the
157    * given array. Here we will only lock one buffer even if it may be need visit
158    * several buffers. The consistency is guaranteed by the caller.
159    * @param start start offset of this buffer array
160    * @param len The maximum number of bytes to be accessed
161    * @param array The array from/to which bytes are to be read/written
162    * @param arrayOffset The offset within the given array of the first byte to
163    *          be read or written
164    * @param visitor implement of how to visit the byte buffer
165    */
166   void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) {
167     assert len >= 0;
168     long end = start + len;
169     int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize);
170     int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize);
171     assert array.length >= len + arrayOffset;
172     assert startBuffer >= 0 && startBuffer < bufferCount;
173     assert endBuffer >= 0 && endBuffer < bufferCount
174         || (endBuffer == bufferCount && endOffset == 0);
175     if (startBuffer >= locks.length || startBuffer < 0) {
176       String msg = "Failed multiple, start=" + start + ",startBuffer="
177           + startBuffer + ",bufferSize=" + bufferSize;
178       LOG.error(msg);
179       throw new RuntimeException(msg);
180     }
181     int srcIndex = 0, cnt = -1;
182     for (int i = startBuffer; i <= endBuffer; ++i) {
183       Lock lock = locks[i];
184       lock.lock();
185       try {
186         ByteBuffer bb = buffers[i];
187         if (i == startBuffer) {
188           cnt = bufferSize - startOffset;
189           if (cnt > len) cnt = len;
190           bb.limit(startOffset + cnt).position(startOffset);
191         } else if (i == endBuffer) {
192           cnt = endOffset;
193           bb.limit(cnt).position(0);
194         } else {
195           cnt = bufferSize;
196           bb.limit(cnt).position(0);
197         }
198         visitor.visit(bb, array, srcIndex + arrayOffset, cnt);
199         srcIndex += cnt;
200       } finally {
201         lock.unlock();
202       }
203     }
204     assert srcIndex == len;
205   }
206 
207   /**
208    * Creates a ByteBuff from a given array of ByteBuffers from the given offset to the
209    * length specified. For eg, if there are 4 buffers forming an array each with length 10 and
210    * if we call asSubBuffer(5, 10) then we will create an MBB consisting of two BBs
211    * and the first one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from
212    * 'position' 0 to 'length' 5.
213    * @param offset
214    * @param len
215    * @return a ByteBuff formed from the underlying ByteBuffers
216    */
217   public ByteBuff asSubByteBuff(long offset, int len) {
218     assert len >= 0;
219     long end = offset + len;
220     int startBuffer = (int) (offset / bufferSize), startBufferOffset = (int) (offset % bufferSize);
221     int endBuffer = (int) (end / bufferSize), endBufferOffset = (int) (end % bufferSize);
222     assert startBuffer >= 0 && startBuffer < bufferCount;
223     assert endBuffer >= 0 && endBuffer < bufferCount
224         || (endBuffer == bufferCount && endBufferOffset == 0);
225     if (startBuffer >= locks.length || startBuffer < 0) {
226       String msg = "Failed subArray, start=" + offset + ",startBuffer=" + startBuffer
227           + ",bufferSize=" + bufferSize;
228       LOG.error(msg);
229       throw new RuntimeException(msg);
230     }
231     int srcIndex = 0, cnt = -1;
232     ByteBuffer[] mbb = new ByteBuffer[endBuffer - startBuffer + 1];
233     for (int i = startBuffer,j=0; i <= endBuffer; ++i,j++) {
234       Lock lock = locks[i];
235       lock.lock();
236       try {
237         ByteBuffer bb = buffers[i];
238         if (i == startBuffer) {
239           cnt = bufferSize - startBufferOffset;
240           if (cnt > len) cnt = len;
241           ByteBuffer dup = bb.duplicate();
242           dup.limit(startBufferOffset + cnt).position(startBufferOffset);
243           mbb[j] = dup.slice();
244         } else if (i == endBuffer) {
245           cnt = endBufferOffset;
246           ByteBuffer dup = bb.duplicate();
247           dup.position(0).limit(cnt);
248           mbb[j] = dup.slice();
249         } else {
250           cnt = bufferSize ;
251           ByteBuffer dup = bb.duplicate();
252           dup.position(0).limit(cnt);
253           mbb[j] = dup.slice();
254         }
255         srcIndex += cnt;
256       } finally {
257         lock.unlock();
258       }
259     }
260     assert srcIndex == len;
261     if (mbb.length > 1) {
262       return new MultiByteBuff(mbb);
263     } else {
264       return new SingleByteBuff(mbb[0]);
265     }
266   }
267 }