View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.concurrent.locks.Lock;
24  import java.util.concurrent.locks.ReentrantLock;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.nio.ByteBuff;
30  import org.apache.hadoop.hbase.nio.MultiByteBuff;
31  import org.apache.hadoop.hbase.nio.SingleByteBuff;
32  import org.apache.hadoop.util.StringUtils;
33  
34  /**
35   * This class manages an array of ByteBuffers with a default size 4MB. These
36   * buffers are sequential and could be considered as a large buffer.It supports
37   * reading/writing data from this large buffer with a position and offset
38   */
39  @InterfaceAudience.Private
40  public final class ByteBufferArray {
41    private static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
42  
43    public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
44    private ByteBuffer buffers[];
45    private Lock locks[];
46    private int bufferSize;
47    private int bufferCount;
48    private ByteBufferAllocator allocator;
49    /**
50     * We allocate a number of byte buffers as the capacity. In order not to out
51     * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
52     * we will allocate one additional buffer with capacity 0;
53     * @param capacity total size of the byte buffer array
54     * @param directByteBuffer true if we allocate direct buffer
55     * @param allocator the ByteBufferAllocator that will create the buffers
56     * @throws IOException throws IOException if there is an exception thrown by the allocator
57     */
58    public ByteBufferArray(long capacity, boolean directByteBuffer, ByteBufferAllocator allocator)
59        throws IOException {
60      this.bufferSize = DEFAULT_BUFFER_SIZE;
61      if (this.bufferSize > (capacity / 16))
62        this.bufferSize = (int) roundUp(capacity / 16, 32768);
63      this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
64      LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
65          + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
66          + bufferCount + ", direct=" + directByteBuffer);
67      buffers = new ByteBuffer[bufferCount + 1];
68      locks = new Lock[bufferCount + 1];
69      this.allocator = allocator;
70      for (int i = 0; i <= bufferCount; i++) {
71        locks[i] = new ReentrantLock();
72        if (i < bufferCount) {
73          buffers[i] = allocator.allocate(bufferSize, directByteBuffer);
74        } else {
75          // always create on heap
76          buffers[i] = ByteBuffer.allocate(0);
77        }
78      }
79    }
80  
81    private long roundUp(long n, long to) {
82      return ((n + to - 1) / to) * to;
83    }
84  
85    /**
86     * Transfers bytes from this buffer array into the given destination array
87     * @param start start position in the ByteBufferArray
88     * @param len The maximum number of bytes to be written to the given array
89     * @param dstArray The array into which bytes are to be written
90     * @return number of bytes read
91     */
92    public int getMultiple(long start, int len, byte[] dstArray) {
93      return getMultiple(start, len, dstArray, 0);
94    }
95  
96    /**
97     * Transfers bytes from this buffer array into the given destination array
98     * @param start start offset of this buffer array
99     * @param len The maximum number of bytes to be written to the given array
100    * @param dstArray The array into which bytes are to be written
101    * @param dstOffset The offset within the given array of the first byte to be
102    *          written
103    * @return number of bytes read
104    */
105   public int getMultiple(long start, int len, byte[] dstArray, int dstOffset) {
106     multiple(start, len, dstArray, dstOffset, GET_MULTIPLE_VISTOR);
107     return len;
108   }
109 
110   private final static Visitor GET_MULTIPLE_VISTOR = new Visitor() {
111     @Override
112     public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
113       bb.get(array, arrayIdx, len);
114     }
115   };
116 
117   /**
118    * Transfers bytes from the given source array into this buffer array
119    * @param start start offset of this buffer array
120    * @param len The maximum number of bytes to be read from the given array
121    * @param srcArray The array from which bytes are to be read
122    */
123   public void putMultiple(long start, int len, byte[] srcArray) {
124     putMultiple(start, len, srcArray, 0);
125   }
126 
127   /**
128    * Transfers bytes from the given source array into this buffer array
129    * @param start start offset of this buffer array
130    * @param len The maximum number of bytes to be read from the given array
131    * @param srcArray The array from which bytes are to be read
132    * @param srcOffset The offset within the given array of the first byte to be
133    *          read
134    */
135   public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) {
136     multiple(start, len, srcArray, srcOffset, PUT_MULTIPLE_VISITOR);
137   }
138 
139   private final static Visitor PUT_MULTIPLE_VISITOR = new Visitor() {
140     @Override
141     public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
142       bb.put(array, arrayIdx, len);
143     }
144   };
145 
146   private interface Visitor {
147     /**
148      * Visit the given byte buffer, if it is a read action, we will transfer the
149      * bytes from the buffer to the destination array, else if it is a write
150      * action, we will transfer the bytes from the source array to the buffer
151      * @param bb byte buffer
152      * @param array a source or destination byte array
153      * @param arrayOffset offset of the byte array
154      * @param len read/write length
155      */
156     void visit(ByteBuffer bb, byte[] array, int arrayOffset, int len);
157   }
158 
159   /**
160    * Access(read or write) this buffer array with a position and length as the
161    * given array. Here we will only lock one buffer even if it may be need visit
162    * several buffers. The consistency is guaranteed by the caller.
163    * @param start start offset of this buffer array
164    * @param len The maximum number of bytes to be accessed
165    * @param array The array from/to which bytes are to be read/written
166    * @param arrayOffset The offset within the given array of the first byte to
167    *          be read or written
168    * @param visitor implement of how to visit the byte buffer
169    */
170   void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) {
171     assert len >= 0;
172     long end = start + len;
173     int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize);
174     int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize);
175     assert array.length >= len + arrayOffset;
176     assert startBuffer >= 0 && startBuffer < bufferCount;
177     assert endBuffer >= 0 && endBuffer < bufferCount
178         || (endBuffer == bufferCount && endOffset == 0);
179     if (startBuffer >= locks.length || startBuffer < 0) {
180       String msg = "Failed multiple, start=" + start + ",startBuffer="
181           + startBuffer + ",bufferSize=" + bufferSize;
182       LOG.error(msg);
183       throw new RuntimeException(msg);
184     }
185     int srcIndex = 0, cnt = -1;
186     for (int i = startBuffer; i <= endBuffer; ++i) {
187       Lock lock = locks[i];
188       lock.lock();
189       try {
190         ByteBuffer bb = buffers[i];
191         if (i == startBuffer) {
192           cnt = bufferSize - startOffset;
193           if (cnt > len) cnt = len;
194           bb.limit(startOffset + cnt).position(startOffset);
195         } else if (i == endBuffer) {
196           cnt = endOffset;
197           bb.limit(cnt).position(0);
198         } else {
199           cnt = bufferSize;
200           bb.limit(cnt).position(0);
201         }
202         visitor.visit(bb, array, srcIndex + arrayOffset, cnt);
203         srcIndex += cnt;
204       } finally {
205         lock.unlock();
206       }
207     }
208     assert srcIndex == len;
209   }
210 
211   /**
212    * Creates a ByteBuff from a given array of ByteBuffers from the given offset to the
213    * length specified. For eg, if there are 4 buffers forming an array each with length 10 and
214    * if we call asSubBuffer(5, 10) then we will create an MBB consisting of two BBs
215    * and the first one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from
216    * 'position' 0 to 'length' 5.
217    * @param offset
218    * @param len
219    * @return a ByteBuff formed from the underlying ByteBuffers
220    */
221   public ByteBuff asSubByteBuff(long offset, int len) {
222     assert len >= 0;
223     long end = offset + len;
224     int startBuffer = (int) (offset / bufferSize), startBufferOffset = (int) (offset % bufferSize);
225     int endBuffer = (int) (end / bufferSize), endBufferOffset = (int) (end % bufferSize);
226     // Last buffer in the array is a dummy one with 0 capacity. Avoid sending back that
227     if (endBuffer == this.bufferCount) {
228       endBuffer--;
229       endBufferOffset = bufferSize;
230     }
231     assert startBuffer >= 0 && startBuffer < bufferCount;
232     assert endBuffer >= 0 && endBuffer < bufferCount
233         || (endBuffer == bufferCount && endBufferOffset == 0);
234     if (startBuffer >= locks.length || startBuffer < 0) {
235       String msg = "Failed subArray, start=" + offset + ",startBuffer=" + startBuffer
236           + ",bufferSize=" + bufferSize;
237       LOG.error(msg);
238       throw new RuntimeException(msg);
239     }
240     int srcIndex = 0, cnt = -1;
241     ByteBuffer[] mbb = new ByteBuffer[endBuffer - startBuffer + 1];
242     for (int i = startBuffer,j=0; i <= endBuffer; ++i,j++) {
243       Lock lock = locks[i];
244       lock.lock();
245       try {
246         ByteBuffer bb = buffers[i];
247         if (i == startBuffer) {
248           cnt = bufferSize - startBufferOffset;
249           if (cnt > len) cnt = len;
250           ByteBuffer dup = bb.duplicate();
251           dup.limit(startBufferOffset + cnt).position(startBufferOffset);
252           mbb[j] = dup.slice();
253         } else if (i == endBuffer) {
254           cnt = endBufferOffset;
255           ByteBuffer dup = bb.duplicate();
256           dup.position(0).limit(cnt);
257           mbb[j] = dup.slice();
258         } else {
259           cnt = bufferSize ;
260           ByteBuffer dup = bb.duplicate();
261           dup.position(0).limit(cnt);
262           mbb[j] = dup.slice();
263         }
264         srcIndex += cnt;
265       } finally {
266         lock.unlock();
267       }
268     }
269     assert srcIndex == len;
270     if (mbb.length > 1) {
271       return new MultiByteBuff(mbb);
272     } else {
273       return new SingleByteBuff(mbb[0]);
274     }
275   }
276 }