View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile.slab;
20  
21  import java.nio.ByteBuffer;
22  import java.util.concurrent.ConcurrentLinkedQueue;
23  import java.util.concurrent.LinkedBlockingQueue;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.util.ClassSize;
29  import org.apache.hadoop.hbase.util.DirectMemoryUtils;
30  import com.google.common.base.Preconditions;
31  
32  /**
33   * Slab is a class which is designed to allocate blocks of a certain size.
34   * Constructor creates a number of DirectByteBuffers and slices them into the
35   * requisite size, then puts them all in a buffer.
36   **/
37  
38  @InterfaceAudience.Private
39  class Slab implements org.apache.hadoop.hbase.io.HeapSize {
40    static final Log LOG = LogFactory.getLog(Slab.class);
41  
42    /** This is where our items, or blocks of the slab, are stored. */
43    private LinkedBlockingQueue<ByteBuffer> buffers;
44  
45    /** This is where our Slabs are stored */
46    private ConcurrentLinkedQueue<ByteBuffer> slabs;
47  
48    private final int blockSize;
49    private final int numBlocks;
50    private long heapSize;
51  
52    Slab(int blockSize, int numBlocks) {
53      buffers = new LinkedBlockingQueue<ByteBuffer>();
54      slabs = new ConcurrentLinkedQueue<ByteBuffer>();
55  
56      this.blockSize = blockSize;
57      this.numBlocks = numBlocks;
58  
59      this.heapSize = ClassSize.estimateBase(this.getClass(), false);
60  
61      int maxBlocksPerSlab = Integer.MAX_VALUE / blockSize;
62      int maxSlabSize = maxBlocksPerSlab * blockSize;
63  
64      int numFullSlabs = numBlocks / maxBlocksPerSlab;
65      int partialSlabSize = (numBlocks % maxBlocksPerSlab) * blockSize;
66      for (int i = 0; i < numFullSlabs; i++) {
67        allocateAndSlice(maxSlabSize, blockSize);
68      }
69  
70      if (partialSlabSize > 0) {
71        allocateAndSlice(partialSlabSize, blockSize);
72      }
73    }
74  
75    private void allocateAndSlice(int size, int sliceSize) {
76      ByteBuffer newSlab = ByteBuffer.allocateDirect(size);
77      slabs.add(newSlab);
78      for (int j = 0; j < newSlab.capacity(); j += sliceSize) {
79        newSlab.limit(j + sliceSize).position(j);
80        ByteBuffer aSlice = newSlab.slice();
81        buffers.add(aSlice);
82        heapSize += ClassSize.estimateBase(aSlice.getClass(), false);
83      }
84    }
85  
86    /*
87     * Shutdown deallocates the memory for all the DirectByteBuffers. Each
88     * DirectByteBuffer has a "cleaner" method, which is similar to a
89     * deconstructor in C++.
90     */
91    void shutdown() {
92      for (ByteBuffer aSlab : slabs) {
93        try {
94          DirectMemoryUtils.destroyDirectByteBuffer(aSlab);
95        } catch (Exception e) {
96          LOG.warn("Unable to deallocate direct memory during shutdown", e);
97        }
98      }
99    }
100 
101   int getBlockSize() {
102     return this.blockSize;
103   }
104 
105   int getBlockCapacity() {
106     return this.numBlocks;
107   }
108 
109   int getBlocksRemaining() {
110     return this.buffers.size();
111   }
112 
113   /*
114    * Throws an exception if you try to allocate a
115    * bigger size than the allocator can handle. Alloc will block until a buffer is available.
116    */
117   ByteBuffer alloc(int bufferSize) throws InterruptedException {
118     int newCapacity = Preconditions.checkPositionIndex(bufferSize, blockSize);
119 
120     ByteBuffer returnedBuffer = buffers.take();
121 
122     returnedBuffer.clear().limit(newCapacity);
123     return returnedBuffer;
124   }
125 
126   void free(ByteBuffer toBeFreed) {
127     Preconditions.checkArgument(toBeFreed.capacity() == blockSize);
128     buffers.add(toBeFreed);
129   }
130 
131   @Override
132   public long heapSize() {
133     return heapSize;
134   }
135 }