001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import java.nio.ByteBuffer; 021import java.util.Queue; 022import java.util.concurrent.ConcurrentLinkedQueue; 023import java.util.concurrent.atomic.AtomicInteger; 024 025import org.apache.yetus.audience.InterfaceAudience; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 030 031/** 032 * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. This 033 * pool keeps an upper bound on the count of ByteBuffers in the pool and a fixed size of ByteBuffer 034 * that it will create. When requested, if a free ByteBuffer is already present, it will return 035 * that. And when no free ByteBuffer available and we are below the max count, it will create a new 036 * one and return that. 037 * 038 * <p> 039 * Note: This pool returns off heap ByteBuffers by default. If on heap ByteBuffers to be pooled, 040 * pass 'directByteBuffer' as false while construction of the pool. 041 * <p> 042 * This class is thread safe. 043 * 044 * @see ByteBufferListOutputStream 045 */ 046@InterfaceAudience.Private 047public class ByteBufferPool { 048 private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class); 049 // TODO better config names? 050 // hbase.ipc.server.reservoir.initial.max -> hbase.ipc.server.reservoir.max.buffer.count 051 // hbase.ipc.server.reservoir.initial.buffer.size -> hbase.ipc.server.reservoir.buffer.size 052 public static final String MAX_POOL_SIZE_KEY = "hbase.ipc.server.reservoir.initial.max"; 053 public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.reservoir.initial.buffer.size"; 054 public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;// 64 KB. Making it same as the chunk size 055 // what we will write/read to/from the 056 // socket channel. 057 private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>(); 058 059 private final int bufferSize; 060 private final int maxPoolSize; 061 private AtomicInteger count; // Count of the BBs created already for this pool. 062 private final boolean directByteBuffer; //Whether this pool should return DirectByteBuffers 063 private boolean maxPoolSizeInfoLevelLogged = false; 064 065 /** 066 * @param bufferSize Size of each buffer created by this pool. 067 * @param maxPoolSize Max number of buffers to keep in this pool. 068 */ 069 public ByteBufferPool(int bufferSize, int maxPoolSize) { 070 this(bufferSize, maxPoolSize, true); 071 } 072 073 /** 074 * @param bufferSize Size of each buffer created by this pool. 075 * @param maxPoolSize Max number of buffers to keep in this pool. 076 * @param directByteBuffer Whether to create direct ByteBuffer or on heap ByteBuffer. 077 */ 078 public ByteBufferPool(int bufferSize, int maxPoolSize, boolean directByteBuffer) { 079 this.bufferSize = bufferSize; 080 this.maxPoolSize = maxPoolSize; 081 this.directByteBuffer = directByteBuffer; 082 // TODO can add initialPoolSize config also and make those many BBs ready for use. 083 LOG.info("Created with bufferSize={} and maxPoolSize={}", 084 org.apache.hadoop.util.StringUtils.byteDesc(bufferSize), 085 org.apache.hadoop.util.StringUtils.byteDesc(maxPoolSize)); 086 this.count = new AtomicInteger(0); 087 } 088 089 /** 090 * @return One free ByteBuffer from the pool. If no free ByteBuffer and we have not reached the 091 * maximum pool size, it will create a new one and return. In case of max pool size also 092 * reached, will return null. When pool returned a ByteBuffer, make sure to return it back 093 * to pool after use. 094 * @see #putbackBuffer(ByteBuffer) 095 */ 096 public ByteBuffer getBuffer() { 097 ByteBuffer bb = buffers.poll(); 098 if (bb != null) { 099 // Clear sets limit == capacity. Position == 0. 100 bb.clear(); 101 return bb; 102 } 103 while (true) { 104 int c = this.count.intValue(); 105 if (c >= this.maxPoolSize) { 106 if (maxPoolSizeInfoLevelLogged) { 107 if (LOG.isDebugEnabled()) { 108 LOG.debug("Pool already reached its max capacity : " + this.maxPoolSize 109 + " and no free buffers now. Consider increasing the value for '" 110 + MAX_POOL_SIZE_KEY + "' ?"); 111 } 112 } else { 113 LOG.info("Pool already reached its max capacity : " + this.maxPoolSize 114 + " and no free buffers now. Consider increasing the value for '" + MAX_POOL_SIZE_KEY 115 + "' ?"); 116 maxPoolSizeInfoLevelLogged = true; 117 } 118 return null; 119 } 120 if (!this.count.compareAndSet(c, c + 1)) { 121 continue; 122 } 123 if (LOG.isTraceEnabled()) { 124 LOG.trace("Creating a new offheap ByteBuffer of size: " + this.bufferSize); 125 } 126 return this.directByteBuffer ? ByteBuffer.allocateDirect(this.bufferSize) 127 : ByteBuffer.allocate(this.bufferSize); 128 } 129 } 130 131 /** 132 * Return back a ByteBuffer after its use. Do not try to return put back a ByteBuffer, not 133 * obtained from this pool. 134 * @param buf ByteBuffer to return. 135 */ 136 public void putbackBuffer(ByteBuffer buf) { 137 if (buf.capacity() != this.bufferSize || (this.directByteBuffer ^ buf.isDirect())) { 138 LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored"); 139 return; 140 } 141 buffers.offer(buf); 142 } 143 144 public int getBufferSize() { 145 return this.bufferSize; 146 } 147 148 /** 149 * @return Number of free buffers 150 */ 151 @VisibleForTesting 152 public int getQueueSize() { 153 return buffers.size(); 154 } 155}