View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io;
19  
20  import java.nio.ByteBuffer;
21  import java.util.Queue;
22  import java.util.concurrent.atomic.AtomicLong;
23  import java.util.concurrent.locks.ReentrantLock;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.util.BoundedArrayQueue;
29  
30  import com.google.common.annotations.VisibleForTesting;
31  
32  /**
33   * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer.
34   * This pool keeps an upper bound on the count of ByteBuffers in the pool and on the maximum size
35   * of ByteBuffer that it will retain (Hence the pool is 'bounded' as opposed to, say,
36   * Hadoop's ElasticByteBuffferPool).
37   * If a ByteBuffer is bigger than the configured threshold, we will just let the ByteBuffer go
38   * rather than add it to the pool. If more ByteBuffers than the configured maximum instances,
39   * we will not add the passed ByteBuffer to the pool; we will just drop it
40   * (we will log a WARN in this case that we are at capacity).
41   *
42   * <p>The intended use case is a reservoir of bytebuffers that an RPC can reuse; buffers tend to
43   * achieve a particular 'run' size over time give or take a few extremes. Set TRACE level on this
44   * class for a couple of seconds to get reporting on how it is running when deployed.
45   *
46   * <p>This class is thread safe.
47   */
48  @InterfaceAudience.Private
49  public class BoundedByteBufferPool {
50    private static final Log LOG = LogFactory.getLog(BoundedByteBufferPool.class);
51  
52    @VisibleForTesting
53    final Queue<ByteBuffer> buffers;
54  
55    // Maximum size of a ByteBuffer to retain in pool
56    private final int maxByteBufferSizeToCache;
57  
58    // A running average only it only rises, it never recedes
59    @VisibleForTesting
60    volatile int runningAverage;
61  
62    // Scratch that keeps rough total size of pooled bytebuffers
63    private volatile int totalReservoirCapacity;
64  
65    // For reporting
66    private AtomicLong allocations = new AtomicLong(0);
67  
68    private ReentrantLock lock =  new ReentrantLock();
69  
70    /**
71     * @param maxByteBufferSizeToCache
72     * @param initialByteBufferSize
73     * @param maxToCache
74     */
75    public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize,
76        final int maxToCache) {
77      this.maxByteBufferSizeToCache = maxByteBufferSizeToCache;
78      this.runningAverage = initialByteBufferSize;
79      this.buffers = new BoundedArrayQueue<ByteBuffer>(maxToCache);
80    }
81  
82    public ByteBuffer getBuffer() {
83      ByteBuffer bb = null;
84      lock.lock();
85      try {
86        bb = this.buffers.poll();
87        if (bb != null) {
88          this.totalReservoirCapacity -= bb.capacity();
89        }
90      } finally {
91        lock.unlock();
92      }
93      if (bb != null) {
94        // Clear sets limit == capacity. Postion == 0.
95        bb.clear();
96      } else {
97        bb = ByteBuffer.allocate(this.runningAverage);
98        this.allocations.incrementAndGet();
99      }
100     if (LOG.isTraceEnabled()) {
101       LOG.trace("runningAverage=" + this.runningAverage +
102         ", totalCapacity=" + this.totalReservoirCapacity + ", count=" + this.buffers.size() +
103         ", alloctions=" + this.allocations.get());
104     }
105     return bb;
106   }
107 
108   public void putBuffer(ByteBuffer bb) {
109     // If buffer is larger than we want to keep around, just let it go.
110     if (bb.capacity() > this.maxByteBufferSizeToCache) return;
111     boolean success = false;
112     int average = 0;
113     lock.lock();
114     try {
115       success = this.buffers.offer(bb);
116       if (success) {
117         this.totalReservoirCapacity += bb.capacity();
118         average = this.totalReservoirCapacity / this.buffers.size(); // size will never be 0.
119       }
120     } finally {
121       lock.unlock();
122     }
123     if (!success) {
124       if (LOG.isDebugEnabled()) {
125         LOG.debug("At capacity: " + this.buffers.size());
126       }
127     } else {
128       if (average > this.runningAverage && average < this.maxByteBufferSizeToCache) {
129         this.runningAverage = average;
130       }
131     }
132   }
133 }