001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import java.nio.ByteBuffer; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Queue; 024import java.util.concurrent.ConcurrentLinkedQueue; 025import java.util.concurrent.atomic.AtomicInteger; 026import java.util.concurrent.atomic.LongAdder; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.nio.ByteBuff; 030import org.apache.hadoop.hbase.nio.SingleByteBuff; 031import org.apache.hadoop.hbase.util.ReflectionUtils; 032import org.apache.hadoop.hbase.util.UnsafeAccess; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 038 039/** 040 * ByteBuffAllocator is a nio ByteBuffer pool. It returns {@link ByteBuff}s which are wrappers of 041 * offheap {@link ByteBuffer} usually. If we are sure that the returned ByteBuffs have reached the 042 * end of their life cycle, we must call {@link ByteBuff#release()} to return buffers to the pool 043 * otherwise the pool will leak. If the desired memory size is larger than what the ByteBufferPool 044 * has available, we'll downgrade to allocate ByteBuffers from the heap. Increase the ByteBufferPool 045 * size if detect this case.<br/> 046 * <br/> 047 * For better memory/pool utilization, there is a lower bound named 048 * <code>minSizeForReservoirUse</code> in this allocator, and if the desired size is less than 049 * <code>minSizeForReservoirUse</code>, the allocator will just allocate the ByteBuffer from heap 050 * and let the JVM manage memory, because it better to not waste pool slots allocating a single 051 * fixed-size ByteBuffer for a small object.<br/> 052 * <br/> 053 * This pool can be used anywhere it makes sense managing memory. Currently used at least by RPC. 054 */ 055@InterfaceAudience.Private 056public class ByteBuffAllocator { 057 058 private static final Logger LOG = LoggerFactory.getLogger(ByteBuffAllocator.class); 059 060 // The on-heap allocator is mostly used for testing but also has some non-test usage such as 061 // for scanning snapshot. This implementation will just allocate ByteBuffers from heap but 062 // wrapped by ByteBuff. 063 public static final ByteBuffAllocator HEAP = ByteBuffAllocator.createOnHeap(); 064 065 public static final String ALLOCATOR_POOL_ENABLED_KEY = "hbase.server.allocator.pool.enabled"; 066 067 public static final String MAX_BUFFER_COUNT_KEY = "hbase.server.allocator.max.buffer.count"; 068 069 public static final String BUFFER_SIZE_KEY = "hbase.server.allocator.buffer.size"; 070 071 public static final String MIN_ALLOCATE_SIZE_KEY = "hbase.server.allocator.minimal.allocate.size"; 072 073 /** 074 * Set an alternate bytebuffallocator by setting this config, e.g. we can config 075 * {@link DeallocateRewriteByteBuffAllocator} to find out prematurely release issues 076 */ 077 public static final String BYTEBUFF_ALLOCATOR_CLASS = "hbase.bytebuff.allocator.class"; 078 079 /** 080 * @deprecated since 2.3.0 and will be removed in 4.0.0. Use 081 * {@link ByteBuffAllocator#ALLOCATOR_POOL_ENABLED_KEY} instead. 082 */ 083 @Deprecated 084 public static final String DEPRECATED_ALLOCATOR_POOL_ENABLED_KEY = 085 "hbase.ipc.server.reservoir.enabled"; 086 087 /** 088 * @deprecated since 2.3.0 and will be removed in 4.0.0. Use 089 * {@link ByteBuffAllocator#MAX_BUFFER_COUNT_KEY} instead. 090 */ 091 @Deprecated 092 static final String DEPRECATED_MAX_BUFFER_COUNT_KEY = "hbase.ipc.server.reservoir.initial.max"; 093 094 /** 095 * @deprecated since 2.3.0 and will be removed in 4.0.0. Use 096 * {@link ByteBuffAllocator#BUFFER_SIZE_KEY} instead. 097 */ 098 @Deprecated 099 static final String DEPRECATED_BUFFER_SIZE_KEY = "hbase.ipc.server.reservoir.initial.buffer.size"; 100 101 /** 102 * There're some reasons why better to choose 65KB(rather than 64KB) as the default buffer size: 103 * <p> 104 * 1. Almost all of the data blocks have the block size: 64KB + delta, whose delta is very small, 105 * depends on the size of lastKeyValue. If we set buffer.size=64KB, then each block will be 106 * allocated as a MultiByteBuff: one 64KB DirectByteBuffer and delta bytes HeapByteBuffer, the 107 * HeapByteBuffer will increase the GC pressure. Ideally, we should let the data block to be 108 * allocated as a SingleByteBuff, it has simpler data structure, faster access speed, less heap 109 * usage. 110 * <p> 111 * 2. Since the blocks are MultiByteBuff when using buffer.size=64KB, so we have to calculate the 112 * checksum by an temp heap copying (see HBASE-21917), while if it's a SingleByteBuff, we can 113 * speed the checksum by calling the hadoop' checksum in native lib, which is more faster. 114 * <p> 115 * For performance comparison, please see HBASE-22483. 116 */ 117 public static final int DEFAULT_BUFFER_SIZE = 65 * 1024; 118 119 public static final Recycler NONE = () -> { 120 }; 121 122 public interface Recycler { 123 void free(); 124 } 125 126 protected final boolean reservoirEnabled; 127 protected final int bufSize; 128 private final int maxBufCount; 129 private final AtomicInteger usedBufCount = new AtomicInteger(0); 130 131 private boolean maxPoolSizeInfoLevelLogged = false; 132 133 // If the desired size is at least this size, it'll allocated from ByteBufferPool, otherwise it'll 134 // allocated from heap for better utilization. We make this to be 1/6th of the pool buffer size. 135 private final int minSizeForReservoirUse; 136 137 private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>(); 138 139 // Metrics to track the pool allocation bytes and heap allocation bytes. If heap allocation 140 // bytes is increasing so much, then we may need to increase the max.buffer.count . 141 private final LongAdder poolAllocationBytes = new LongAdder(); 142 private final LongAdder heapAllocationBytes = new LongAdder(); 143 private long lastPoolAllocationBytes = 0; 144 private long lastHeapAllocationBytes = 0; 145 146 /** 147 * Initialize an {@link ByteBuffAllocator} which will try to allocate ByteBuffers from off-heap if 148 * reservoir is enabled and the reservoir has enough buffers, otherwise the allocator will just 149 * allocate the insufficient buffers from on-heap to meet the requirement. 150 * @param conf which get the arguments to initialize the allocator. 151 * @param reservoirEnabled indicate whether the reservoir is enabled or disabled. NOTICE: if 152 * reservoir is enabled, then we will use the pool allocator to allocate 153 * off-heap ByteBuffers and use the HEAP allocator to allocate heap 154 * ByteBuffers. Otherwise if reservoir is disabled then all allocations 155 * will happen in HEAP instance. 156 * @return ByteBuffAllocator to manage the byte buffers. 157 */ 158 public static ByteBuffAllocator create(Configuration conf, boolean reservoirEnabled) { 159 int poolBufSize = conf.getInt(BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE); 160 if (reservoirEnabled) { 161 // The max number of buffers to be pooled in the ByteBufferPool. The default value been 162 // selected based on the #handlers configured. When it is read request, 2 MB is the max size 163 // at which we will send back one RPC request. Means max we need 2 MB for creating the 164 // response cell block. (Well it might be much lesser than this because in 2 MB size calc, we 165 // include the heap size overhead of each cells also.) Considering 2 MB, we will need 166 // (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size 167 // is by default 64 KB. 168 // In case of read request, at the end of the handler process, we will make the response 169 // cellblock and add the Call to connection's response Q and a single Responder thread takes 170 // connections and responses from that one by one and do the socket write. So there is chances 171 // that by the time a handler originated response is actually done writing to socket and so 172 // released the BBs it used, the handler might have processed one more read req. On an avg 2x 173 // we consider and consider that also for the max buffers to pool 174 int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize; 175 int maxBuffCount = 176 conf.getInt(MAX_BUFFER_COUNT_KEY, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 177 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2); 178 int minSizeForReservoirUse = conf.getInt(MIN_ALLOCATE_SIZE_KEY, poolBufSize / 6); 179 Class<?> clazz = conf.getClass(BYTEBUFF_ALLOCATOR_CLASS, ByteBuffAllocator.class); 180 return (ByteBuffAllocator) ReflectionUtils.newInstance(clazz, true, maxBuffCount, poolBufSize, 181 minSizeForReservoirUse); 182 } else { 183 return HEAP; 184 } 185 } 186 187 /** 188 * Initialize an {@link ByteBuffAllocator} which only allocate ByteBuffer from on-heap, it's 189 * designed for testing purpose or disabled reservoir case. 190 * @return allocator to allocate on-heap ByteBuffer. 191 */ 192 private static ByteBuffAllocator createOnHeap() { 193 return new ByteBuffAllocator(false, 0, DEFAULT_BUFFER_SIZE, Integer.MAX_VALUE); 194 } 195 196 protected ByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize, 197 int minSizeForReservoirUse) { 198 this.reservoirEnabled = reservoirEnabled; 199 this.maxBufCount = maxBufCount; 200 this.bufSize = bufSize; 201 this.minSizeForReservoirUse = minSizeForReservoirUse; 202 } 203 204 public boolean isReservoirEnabled() { 205 return reservoirEnabled; 206 } 207 208 public long getHeapAllocationBytes() { 209 return heapAllocationBytes.sum(); 210 } 211 212 public long getPoolAllocationBytes() { 213 return poolAllocationBytes.sum(); 214 } 215 216 public int getBufferSize() { 217 return this.bufSize; 218 } 219 220 public int getUsedBufferCount() { 221 return this.usedBufCount.intValue(); 222 } 223 224 /** 225 * The {@link ConcurrentLinkedQueue#size()} is O(N) complexity and time-consuming, so DO NOT use 226 * the method except in UT. 227 */ 228 public int getFreeBufferCount() { 229 return this.buffers.size(); 230 } 231 232 public int getTotalBufferCount() { 233 return maxBufCount; 234 } 235 236 public static long getHeapAllocationBytes(ByteBuffAllocator... allocators) { 237 long heapAllocBytes = 0; 238 for (ByteBuffAllocator alloc : Sets.newHashSet(allocators)) { 239 heapAllocBytes += alloc.getHeapAllocationBytes(); 240 } 241 return heapAllocBytes; 242 } 243 244 public static double getHeapAllocationRatio(ByteBuffAllocator... allocators) { 245 double heapDelta = 0.0, poolDelta = 0.0; 246 long heapAllocBytes, poolAllocBytes; 247 // If disabled the pool allocator, then we use the global HEAP allocator. otherwise we use 248 // the pool allocator to allocate offheap ByteBuffers and use the HEAP to allocate heap 249 // ByteBuffers. So here we use a HashSet to remove the duplicated allocator object in disable 250 // case. 251 for (ByteBuffAllocator alloc : Sets.newHashSet(allocators)) { 252 heapAllocBytes = alloc.heapAllocationBytes.sum(); 253 poolAllocBytes = alloc.poolAllocationBytes.sum(); 254 heapDelta += (heapAllocBytes - alloc.lastHeapAllocationBytes); 255 poolDelta += (poolAllocBytes - alloc.lastPoolAllocationBytes); 256 alloc.lastHeapAllocationBytes = heapAllocBytes; 257 alloc.lastPoolAllocationBytes = poolAllocBytes; 258 } 259 // Calculate the heap allocation ratio. 260 if (Math.abs(heapDelta + poolDelta) < 1e-3) { 261 return 0.0; 262 } 263 return heapDelta / (heapDelta + poolDelta); 264 } 265 266 /** 267 * Allocate an buffer with buffer size from ByteBuffAllocator, Note to call the 268 * {@link ByteBuff#release()} if no need any more, otherwise the memory leak happen in NIO 269 * ByteBuffer pool. 270 * @return an ByteBuff with the buffer size. 271 */ 272 public SingleByteBuff allocateOneBuffer() { 273 if (isReservoirEnabled()) { 274 ByteBuffer bb = getBuffer(); 275 if (bb != null) { 276 return new SingleByteBuff(() -> putbackBuffer(bb), bb); 277 } 278 } 279 // Allocated from heap, let the JVM free its memory. 280 return (SingleByteBuff) ByteBuff.wrap(allocateOnHeap(bufSize)); 281 } 282 283 private ByteBuffer allocateOnHeap(int size) { 284 heapAllocationBytes.add(size); 285 return ByteBuffer.allocate(size); 286 } 287 288 /** 289 * Allocate size bytes from the ByteBufAllocator, Note to call the {@link ByteBuff#release()} if 290 * no need any more, otherwise the memory leak happen in NIO ByteBuffer pool. 291 * @param size to allocate 292 * @return an ByteBuff with the desired size. 293 */ 294 public ByteBuff allocate(int size) { 295 if (size < 0) { 296 throw new IllegalArgumentException("size to allocate should >=0"); 297 } 298 // If disabled the reservoir, just allocate it from on-heap. 299 if (!isReservoirEnabled() || size == 0) { 300 return ByteBuff.wrap(allocateOnHeap(size)); 301 } 302 int reminder = size % bufSize; 303 int len = size / bufSize + (reminder > 0 ? 1 : 0); 304 List<ByteBuffer> bbs = new ArrayList<>(len); 305 // Allocate from ByteBufferPool until the remaining is less than minSizeForReservoirUse or 306 // reservoir is exhausted. 307 int remain = size; 308 while (remain >= minSizeForReservoirUse) { 309 ByteBuffer bb = this.getBuffer(); 310 if (bb == null) { 311 break; 312 } 313 bbs.add(bb); 314 remain -= bufSize; 315 } 316 int lenFromReservoir = bbs.size(); 317 if (remain > 0) { 318 // If the last ByteBuffer is too small or the reservoir can not provide more ByteBuffers, we 319 // just allocate the ByteBuffer from on-heap. 320 bbs.add(allocateOnHeap(remain)); 321 } 322 323 ByteBuff bb; 324 // we only need a recycler if we successfully pulled from the pool 325 // this matters for determining whether to add leak detection in RefCnt 326 if (lenFromReservoir == 0) { 327 bb = ByteBuff.wrap(bbs); 328 } else { 329 bb = ByteBuff.wrap(bbs, () -> { 330 for (int i = 0; i < lenFromReservoir; i++) { 331 this.putbackBuffer(bbs.get(i)); 332 } 333 }); 334 } 335 336 bb.limit(size); 337 return bb; 338 } 339 340 /** 341 * Free all direct buffers if allocated, mainly used for testing. 342 */ 343 public void clean() { 344 while (!buffers.isEmpty()) { 345 ByteBuffer b = buffers.poll(); 346 if (b.isDirect()) { 347 UnsafeAccess.freeDirectBuffer(b); 348 } 349 } 350 this.usedBufCount.set(0); 351 this.maxPoolSizeInfoLevelLogged = false; 352 this.poolAllocationBytes.reset(); 353 this.heapAllocationBytes.reset(); 354 this.lastPoolAllocationBytes = 0; 355 this.lastHeapAllocationBytes = 0; 356 } 357 358 /** 359 * @return One free DirectByteBuffer from the pool. If no free ByteBuffer and we have not reached 360 * the maximum pool size, it will create a new one and return. In case of max pool size 361 * also reached, will return null. When pool returned a ByteBuffer, make sure to return it 362 * back to pool after use. 363 */ 364 private ByteBuffer getBuffer() { 365 ByteBuffer bb = buffers.poll(); 366 if (bb != null) { 367 // To reset the limit to capacity and position to 0, must clear here. 368 bb.clear(); 369 poolAllocationBytes.add(bufSize); 370 return bb; 371 } 372 while (true) { 373 int c = this.usedBufCount.intValue(); 374 if (c >= this.maxBufCount) { 375 if (!maxPoolSizeInfoLevelLogged) { 376 LOG.info("Pool already reached its max capacity : {} and no free buffers now. Consider " 377 + "increasing the value for '{}' ?", maxBufCount, MAX_BUFFER_COUNT_KEY); 378 maxPoolSizeInfoLevelLogged = true; 379 } 380 return null; 381 } 382 if (!this.usedBufCount.compareAndSet(c, c + 1)) { 383 continue; 384 } 385 poolAllocationBytes.add(bufSize); 386 return ByteBuffer.allocateDirect(bufSize); 387 } 388 } 389 390 /** 391 * Return back a ByteBuffer after its use. Don't read/write the ByteBuffer after the returning. 392 * @param buf ByteBuffer to return. 393 */ 394 protected void putbackBuffer(ByteBuffer buf) { 395 if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) { 396 LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored"); 397 return; 398 } 399 buffers.offer(buf); 400 } 401}