View Javadoc

1   /**
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.io.hfile.slab;
21  
22  import java.nio.ByteBuffer;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.concurrent.LinkedBlockingQueue;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.util.ClassSize;
29  import org.apache.hadoop.hbase.util.DirectMemoryUtils;
30  import com.google.common.base.Preconditions;
31  
32  /**
33   * Slab is a class which is designed to allocate blocks of a certain size.
34   * Constructor creates a number of DirectByteBuffers and slices them into the
35   * requisite size, then puts them all in a buffer.
36   **/
37  
38  class Slab implements org.apache.hadoop.hbase.io.HeapSize {
39    static final Log LOG = LogFactory.getLog(Slab.class);
40  
41    /** This is where our items, or blocks of the slab, are stored. */
42    private LinkedBlockingQueue<ByteBuffer> buffers;
43  
44    /** This is where our Slabs are stored */
45    private ConcurrentLinkedQueue<ByteBuffer> slabs;
46  
47    private final int blockSize;
48    private final int numBlocks;
49    private long heapSize;
50  
51    Slab(int blockSize, int numBlocks) {
52      buffers = new LinkedBlockingQueue<ByteBuffer>();
53      slabs = new ConcurrentLinkedQueue<ByteBuffer>();
54  
55      this.blockSize = blockSize;
56      this.numBlocks = numBlocks;
57  
58      this.heapSize = ClassSize.estimateBase(this.getClass(), false);
59  
60      int maxBlocksPerSlab = Integer.MAX_VALUE / blockSize;
61      int maxSlabSize = maxBlocksPerSlab * blockSize;
62  
63      int numFullSlabs = numBlocks / maxBlocksPerSlab;
64      int partialSlabSize = (numBlocks % maxBlocksPerSlab) * blockSize;
65      for (int i = 0; i < numFullSlabs; i++) {
66        allocateAndSlice(maxSlabSize, blockSize);
67      }
68  
69      if (partialSlabSize > 0) {
70        allocateAndSlice(partialSlabSize, blockSize);
71      }
72    }
73  
74    private void allocateAndSlice(int size, int sliceSize) {
75      ByteBuffer newSlab = ByteBuffer.allocateDirect(size);
76      slabs.add(newSlab);
77      for (int j = 0; j < newSlab.capacity(); j += sliceSize) {
78        newSlab.limit(j + sliceSize).position(j);
79        ByteBuffer aSlice = newSlab.slice();
80        buffers.add(aSlice);
81        heapSize += ClassSize.estimateBase(aSlice.getClass(), false);
82      }
83    }
84  
85    /*
86     * Shutdown deallocates the memory for all the DirectByteBuffers. Each
87     * DirectByteBuffer has a "cleaner" method, which is similar to a
88     * deconstructor in C++.
89     */
90    void shutdown() {
91      for (ByteBuffer aSlab : slabs) {
92        try {
93          DirectMemoryUtils.destroyDirectByteBuffer(aSlab);
94        } catch (Exception e) {
95          LOG.warn("Unable to deallocate direct memory during shutdown", e);
96        }
97      }
98    }
99  
100   int getBlockSize() {
101     return this.blockSize;
102   }
103 
104   int getBlockCapacity() {
105     return this.numBlocks;
106   }
107 
108   int getBlocksRemaining() {
109     return this.buffers.size();
110   }
111 
112   /*
113    * Throws an exception if you try to allocate a
114    * bigger size than the allocator can handle. Alloc will block until a buffer is available.
115    */
116   ByteBuffer alloc(int bufferSize) throws InterruptedException {
117     int newCapacity = Preconditions.checkPositionIndex(bufferSize, blockSize);
118 
119     ByteBuffer returnedBuffer = buffers.take();
120 
121     returnedBuffer.clear().limit(newCapacity);
122     return returnedBuffer;
123   }
124 
125   void free(ByteBuffer toBeFreed) {
126     Preconditions.checkArgument(toBeFreed.capacity() == blockSize);
127     buffers.add(toBeFreed);
128   }
129 
130   @Override
131   public long heapSize() {
132     return heapSize;
133   }
134 }