View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.nio.ByteBuffer;
22  import java.util.concurrent.locks.Lock;
23  import java.util.concurrent.locks.ReentrantLock;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.util.StringUtils;
29  
30  /**
31   * This class manages an array of ByteBuffers with a default size 4MB. These
32   * buffers are sequential and could be considered as a large buffer.It supports
33   * reading/writing data from this large buffer with a position and offset
34   */
35  @InterfaceAudience.Private
36  public final class ByteBufferArray {
37    private static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
38  
39    static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
40    private ByteBuffer buffers[];
41    private Lock locks[];
42    private int bufferSize;
43    private int bufferCount;
44  
45    /**
46     * We allocate a number of byte buffers as the capacity. In order not to out
47     * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
48     * we will allocate one additional buffer with capacity 0;
49     * @param capacity total size of the byte buffer array
50     * @param directByteBuffer true if we allocate direct buffer
51     */
52    public ByteBufferArray(long capacity, boolean directByteBuffer) {
53      this.bufferSize = DEFAULT_BUFFER_SIZE;
54      if (this.bufferSize > (capacity / 16))
55        this.bufferSize = (int) roundUp(capacity / 16, 32768);
56      this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
57      LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
58          + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
59          + bufferCount + ", direct=" + directByteBuffer);
60      buffers = new ByteBuffer[bufferCount + 1];
61      locks = new Lock[bufferCount + 1];
62      for (int i = 0; i <= bufferCount; i++) {
63        locks[i] = new ReentrantLock();
64        if (i < bufferCount) {
65          buffers[i] = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize)
66              : ByteBuffer.allocate(bufferSize);
67        } else {
68          buffers[i] = ByteBuffer.allocate(0);
69        }
70  
71      }
72    }
73  
74    private long roundUp(long n, long to) {
75      return ((n + to - 1) / to) * to;
76    }
77  
78    /**
79     * Transfers bytes from this buffer array into the given destination array
80     * @param start start position in the ByteBufferArray
81     * @param len The maximum number of bytes to be written to the given array
82     * @param dstArray The array into which bytes are to be written
83     * @return number of bytes read
84     */
85    public int getMultiple(long start, int len, byte[] dstArray) {
86      return getMultiple(start, len, dstArray, 0);
87    }
88  
89    /**
90     * Transfers bytes from this buffer array into the given destination array
91     * @param start start offset of this buffer array
92     * @param len The maximum number of bytes to be written to the given array
93     * @param dstArray The array into which bytes are to be written
94     * @param dstOffset The offset within the given array of the first byte to be
95     *          written
96     * @return number of bytes read
97     */
98    public int getMultiple(long start, int len, byte[] dstArray, int dstOffset) {
99      multiple(start, len, dstArray, dstOffset, GET_MULTIPLE_VISTOR);
100     return len;
101   }
102 
103   private final static Visitor GET_MULTIPLE_VISTOR = new Visitor() {
104     @Override
105     public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
106       bb.get(array, arrayIdx, len);
107     }
108   };
109 
110   /**
111    * Transfers bytes from the given source array into this buffer array
112    * @param start start offset of this buffer array
113    * @param len The maximum number of bytes to be read from the given array
114    * @param srcArray The array from which bytes are to be read
115    */
116   public void putMultiple(long start, int len, byte[] srcArray) {
117     putMultiple(start, len, srcArray, 0);
118   }
119 
120   /**
121    * Transfers bytes from the given source array into this buffer array
122    * @param start start offset of this buffer array
123    * @param len The maximum number of bytes to be read from the given array
124    * @param srcArray The array from which bytes are to be read
125    * @param srcOffset The offset within the given array of the first byte to be
126    *          read
127    */
128   public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) {
129     multiple(start, len, srcArray, srcOffset, PUT_MULTIPLE_VISITOR);
130   }
131 
132   private final static Visitor PUT_MULTIPLE_VISITOR = new Visitor() {
133     @Override
134     public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
135       bb.put(array, arrayIdx, len);
136     }
137   };
138 
139   private interface Visitor {
140     /**
141      * Visit the given byte buffer, if it is a read action, we will transfer the
142      * bytes from the buffer to the destination array, else if it is a write
143      * action, we will transfer the bytes from the source array to the buffer
144      * @param bb byte buffer
145      * @param array a source or destination byte array
146      * @param arrayOffset offset of the byte array
147      * @param len read/write length
148      */
149     void visit(ByteBuffer bb, byte[] array, int arrayOffset, int len);
150   }
151 
152   /**
153    * Access(read or write) this buffer array with a position and length as the
154    * given array. Here we will only lock one buffer even if it may be need visit
155    * several buffers. The consistency is guaranteed by the caller.
156    * @param start start offset of this buffer array
157    * @param len The maximum number of bytes to be accessed
158    * @param array The array from/to which bytes are to be read/written
159    * @param arrayOffset The offset within the given array of the first byte to
160    *          be read or written
161    * @param visitor implement of how to visit the byte buffer
162    */
163   void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) {
164     assert len >= 0;
165     long end = start + len;
166     int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize);
167     int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize);
168     assert array.length >= len + arrayOffset;
169     assert startBuffer >= 0 && startBuffer < bufferCount;
170     assert endBuffer >= 0 && endBuffer < bufferCount
171         || (endBuffer == bufferCount && endOffset == 0);
172     if (startBuffer >= locks.length || startBuffer < 0) {
173       String msg = "Failed multiple, start=" + start + ",startBuffer="
174           + startBuffer + ",bufferSize=" + bufferSize;
175       LOG.error(msg);
176       throw new RuntimeException(msg);
177     }
178     int srcIndex = 0, cnt = -1;
179     for (int i = startBuffer; i <= endBuffer; ++i) {
180       Lock lock = locks[i];
181       lock.lock();
182       try {
183         ByteBuffer bb = buffers[i];
184         if (i == startBuffer) {
185           cnt = bufferSize - startOffset;
186           if (cnt > len) cnt = len;
187           bb.limit(startOffset + cnt).position(startOffset);
188         } else if (i == endBuffer) {
189           cnt = endOffset;
190           bb.limit(cnt).position(0);
191         } else {
192           cnt = bufferSize;
193           bb.limit(cnt).position(0);
194         }
195         visitor.visit(bb, array, srcIndex + arrayOffset, cnt);
196         srcIndex += cnt;
197       } finally {
198         lock.unlock();
199       }
200     }
201     assert srcIndex == len;
202   }
203 }