001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import java.nio.ByteBuffer;
021import java.util.Queue;
022import java.util.concurrent.ConcurrentLinkedQueue;
023import java.util.concurrent.atomic.AtomicInteger;
024
025import org.apache.yetus.audience.InterfaceAudience;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
030
031/**
032 * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. This
033 * pool keeps an upper bound on the count of ByteBuffers in the pool and a fixed size of ByteBuffer
034 * that it will create. When requested, if a free ByteBuffer is already present, it will return
035 * that. And when no free ByteBuffer available and we are below the max count, it will create a new
036 * one and return that.
037 *
038 * <p>
039 * Note: This pool returns off heap ByteBuffers by default. If on heap ByteBuffers to be pooled,
040 * pass 'directByteBuffer' as false while construction of the pool.
041 * <p>
042 * This class is thread safe.
043 *
044 * @see ByteBufferListOutputStream
045 */
046@InterfaceAudience.Private
047public class ByteBufferPool {
048  private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class);
049  // TODO better config names?
050  // hbase.ipc.server.reservoir.initial.max -> hbase.ipc.server.reservoir.max.buffer.count
051  // hbase.ipc.server.reservoir.initial.buffer.size -> hbase.ipc.server.reservoir.buffer.size
052  public static final String MAX_POOL_SIZE_KEY = "hbase.ipc.server.reservoir.initial.max";
053  public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.reservoir.initial.buffer.size";
054  public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;// 64 KB. Making it same as the chunk size
055                                                          // what we will write/read to/from the
056                                                          // socket channel.
057  private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
058
059  private final int bufferSize;
060  private final int maxPoolSize;
061  private AtomicInteger count; // Count of the BBs created already for this pool.
062  private final boolean directByteBuffer; //Whether this pool should return DirectByteBuffers
063  private boolean maxPoolSizeInfoLevelLogged = false;
064
065  /**
066   * @param bufferSize Size of each buffer created by this pool.
067   * @param maxPoolSize Max number of buffers to keep in this pool.
068   */
069  public ByteBufferPool(int bufferSize, int maxPoolSize) {
070    this(bufferSize, maxPoolSize, true);
071  }
072
073  /**
074   * @param bufferSize Size of each buffer created by this pool.
075   * @param maxPoolSize Max number of buffers to keep in this pool.
076   * @param directByteBuffer Whether to create direct ByteBuffer or on heap ByteBuffer.
077   */
078  public ByteBufferPool(int bufferSize, int maxPoolSize, boolean directByteBuffer) {
079    this.bufferSize = bufferSize;
080    this.maxPoolSize = maxPoolSize;
081    this.directByteBuffer = directByteBuffer;
082    // TODO can add initialPoolSize config also and make those many BBs ready for use.
083    LOG.info("Created with bufferSize={} and maxPoolSize={}",
084        org.apache.hadoop.util.StringUtils.byteDesc(bufferSize),
085        org.apache.hadoop.util.StringUtils.byteDesc(maxPoolSize));
086    this.count = new AtomicInteger(0);
087  }
088
089  /**
090   * @return One free ByteBuffer from the pool. If no free ByteBuffer and we have not reached the
091   *         maximum pool size, it will create a new one and return. In case of max pool size also
092   *         reached, will return null. When pool returned a ByteBuffer, make sure to return it back
093   *         to pool after use.
094   * @see #putbackBuffer(ByteBuffer)
095   */
096  public ByteBuffer getBuffer() {
097    ByteBuffer bb = buffers.poll();
098    if (bb != null) {
099      // Clear sets limit == capacity. Position == 0.
100      bb.clear();
101      return bb;
102    }
103    while (true) {
104      int c = this.count.intValue();
105      if (c >= this.maxPoolSize) {
106        if (maxPoolSizeInfoLevelLogged) {
107          if (LOG.isDebugEnabled()) {
108            LOG.debug("Pool already reached its max capacity : " + this.maxPoolSize
109                + " and no free buffers now. Consider increasing the value for '"
110                + MAX_POOL_SIZE_KEY + "' ?");
111          }
112        } else {
113          LOG.info("Pool already reached its max capacity : " + this.maxPoolSize
114              + " and no free buffers now. Consider increasing the value for '" + MAX_POOL_SIZE_KEY
115              + "' ?");
116          maxPoolSizeInfoLevelLogged = true;
117        }
118        return null;
119      }
120      if (!this.count.compareAndSet(c, c + 1)) {
121        continue;
122      }
123      if (LOG.isTraceEnabled()) {
124        LOG.trace("Creating a new offheap ByteBuffer of size: " + this.bufferSize);
125      }
126      return this.directByteBuffer ? ByteBuffer.allocateDirect(this.bufferSize)
127          : ByteBuffer.allocate(this.bufferSize);
128    }
129  }
130
131  /**
132   * Return back a ByteBuffer after its use. Do not try to return put back a ByteBuffer, not
133   * obtained from this pool.
134   * @param buf ByteBuffer to return.
135   */
136  public void putbackBuffer(ByteBuffer buf) {
137    if (buf.capacity() != this.bufferSize || (this.directByteBuffer ^ buf.isDirect())) {
138      LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
139      return;
140    }
141    buffers.offer(buf);
142  }
143
144  public int getBufferSize() {
145    return this.bufferSize;
146  }
147
148  /**
149   * @return Number of free buffers
150   */
151  @VisibleForTesting
152  public int getQueueSize() {
153    return buffers.size();
154  }
155}