001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.io; 020 021import java.nio.ByteBuffer; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Queue; 025import java.util.concurrent.ConcurrentLinkedQueue; 026import java.util.concurrent.atomic.AtomicInteger; 027import java.util.concurrent.atomic.LongAdder; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.nio.ByteBuff; 032import org.apache.hadoop.hbase.nio.SingleByteBuff; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 038import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 039 040import sun.nio.ch.DirectBuffer; 041 042/** 043 * ByteBuffAllocator is used for allocating/freeing the ByteBuffers from/to NIO ByteBuffer pool, and 044 * it provide high-level interfaces for upstream. when allocating desired memory size, it will 045 * return {@link ByteBuff}, if we are sure that those ByteBuffers have reached the end of life 046 * cycle, we must do the {@link ByteBuff#release()} to return back the buffers to the pool, 047 * otherwise ByteBuffers leak will happen, and the NIO ByteBuffer pool may be exhausted. there's 048 * possible that the desired memory size is large than ByteBufferPool has, we'll downgrade to 049 * allocate ByteBuffers from heap which meaning the GC pressure may increase again. Of course, an 050 * better way is increasing the ByteBufferPool size if we detected this case. <br/> 051 * <br/> 052 * On the other hand, for better memory utilization, we have set an lower bound named 053 * minSizeForReservoirUse in this allocator, and if the desired size is less than 054 * minSizeForReservoirUse, the allocator will just allocate the ByteBuffer from heap and let the JVM 055 * free its memory, because it's too wasting to allocate a single fixed-size ByteBuffer for some 056 * small objects. <br/> 057 * <br/> 058 * We recommend to use this class to allocate/free {@link ByteBuff} in the RPC layer or the entire 059 * read/write path, because it hide the details of memory management and its APIs are more friendly 060 * to the upper layer. 061 */ 062@InterfaceAudience.Private 063public class ByteBuffAllocator { 064 065 private static final Logger LOG = LoggerFactory.getLogger(ByteBuffAllocator.class); 066 067 // The on-heap allocator is mostly used for testing, but also some non-test usage, such as 068 // scanning snapshot, we won't have an RpcServer to initialize the allocator, so just use the 069 // default heap allocator, it will just allocate ByteBuffers from heap but wrapped by an ByteBuff. 070 public static final ByteBuffAllocator HEAP = ByteBuffAllocator.createOnHeap(); 071 072 public static final String ALLOCATOR_POOL_ENABLED_KEY = "hbase.server.allocator.pool.enabled"; 073 074 public static final String MAX_BUFFER_COUNT_KEY = "hbase.server.allocator.max.buffer.count"; 075 076 public static final String BUFFER_SIZE_KEY = "hbase.server.allocator.buffer.size"; 077 078 public static final String MIN_ALLOCATE_SIZE_KEY = "hbase.server.allocator.minimal.allocate.size"; 079 080 /** 081 * @deprecated since 2.3.0 and will be removed in 4.0.0. Use 082 * {@link ByteBuffAllocator#ALLOCATOR_POOL_ENABLED_KEY} instead. 083 */ 084 @Deprecated 085 public static final String DEPRECATED_ALLOCATOR_POOL_ENABLED_KEY = 086 "hbase.ipc.server.reservoir.enabled"; 087 088 /** 089 * @deprecated since 2.3.0 and will be removed in 4.0.0. Use 090 * {@link ByteBuffAllocator#MAX_BUFFER_COUNT_KEY} instead. 091 */ 092 @Deprecated 093 static final String DEPRECATED_MAX_BUFFER_COUNT_KEY = "hbase.ipc.server.reservoir.initial.max"; 094 095 /** 096 * @deprecated since 2.3.0 and will be removed in 4.0.0. Use 097 * {@link ByteBuffAllocator#BUFFER_SIZE_KEY} instead. 098 */ 099 @Deprecated 100 static final String DEPRECATED_BUFFER_SIZE_KEY = "hbase.ipc.server.reservoir.initial.buffer.size"; 101 102 /** 103 * The hbase.ipc.server.reservoir.initial.max and hbase.ipc.server.reservoir.initial.buffer.size 104 * were introduced in HBase2.0.0, while in HBase3.0.0 the two config keys will be replaced by 105 * {@link ByteBuffAllocator#MAX_BUFFER_COUNT_KEY} and {@link ByteBuffAllocator#BUFFER_SIZE_KEY}. 106 * Also the hbase.ipc.server.reservoir.enabled will be replaced by 107 * hbase.server.allocator.pool.enabled. Keep the three old config keys here for HBase2.x 108 * compatibility. 109 */ 110 static { 111 Configuration.addDeprecation(DEPRECATED_ALLOCATOR_POOL_ENABLED_KEY, ALLOCATOR_POOL_ENABLED_KEY); 112 Configuration.addDeprecation(DEPRECATED_MAX_BUFFER_COUNT_KEY, MAX_BUFFER_COUNT_KEY); 113 Configuration.addDeprecation(DEPRECATED_BUFFER_SIZE_KEY, BUFFER_SIZE_KEY); 114 } 115 116 /** 117 * There're some reasons why better to choose 65KB(rather than 64KB) as the default buffer size: 118 * <p> 119 * 1. Almost all of the data blocks have the block size: 64KB + delta, whose delta is very small, 120 * depends on the size of lastKeyValue. If we set buffer.size=64KB, then each block will be 121 * allocated as a MultiByteBuff: one 64KB DirectByteBuffer and delta bytes HeapByteBuffer, the 122 * HeapByteBuffer will increase the GC pressure. Ideally, we should let the data block to be 123 * allocated as a SingleByteBuff, it has simpler data structure, faster access speed, less heap 124 * usage. 125 * <p> 126 * 2. Since the blocks are MultiByteBuff when using buffer.size=64KB, so we have to calculate the 127 * checksum by an temp heap copying (see HBASE-21917), while if it's a SingleByteBuff, we can 128 * speed the checksum by calling the hadoop' checksum in native lib, which is more faster. 129 * <p> 130 * For performance comparison, please see HBASE-22483. 131 */ 132 public static final int DEFAULT_BUFFER_SIZE = 65 * 1024; 133 134 public static final Recycler NONE = () -> { 135 }; 136 137 public interface Recycler { 138 void free(); 139 } 140 141 private final boolean reservoirEnabled; 142 private final int bufSize; 143 private final int maxBufCount; 144 private final AtomicInteger usedBufCount = new AtomicInteger(0); 145 146 private boolean maxPoolSizeInfoLevelLogged = false; 147 148 // If the desired size is at least this size, it'll allocated from ByteBufferPool, otherwise it'll 149 // allocated from heap for better utilization. We make this to be 1/6th of the pool buffer size. 150 private final int minSizeForReservoirUse; 151 152 private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>(); 153 154 // Metrics to track the pool allocation bytes and heap allocation bytes. If heap allocation 155 // bytes is increasing so much, then we may need to increase the max.buffer.count . 156 private final LongAdder poolAllocationBytes = new LongAdder(); 157 private final LongAdder heapAllocationBytes = new LongAdder(); 158 private long lastPoolAllocationBytes = 0; 159 private long lastHeapAllocationBytes = 0; 160 161 /** 162 * Initialize an {@link ByteBuffAllocator} which will try to allocate ByteBuffers from off-heap if 163 * reservoir is enabled and the reservoir has enough buffers, otherwise the allocator will just 164 * allocate the insufficient buffers from on-heap to meet the requirement. 165 * @param conf which get the arguments to initialize the allocator. 166 * @param reservoirEnabled indicate whether the reservoir is enabled or disabled. NOTICE: if 167 * reservoir is enabled, then we will use the pool allocator to allocate off-heap 168 * ByteBuffers and use the HEAP allocator to allocate heap ByteBuffers. Otherwise if 169 * reservoir is disabled then all allocations will happen in HEAP instance. 170 * @return ByteBuffAllocator to manage the byte buffers. 171 */ 172 public static ByteBuffAllocator create(Configuration conf, boolean reservoirEnabled) { 173 if (conf.get(DEPRECATED_BUFFER_SIZE_KEY) != null 174 || conf.get(DEPRECATED_MAX_BUFFER_COUNT_KEY) != null) { 175 LOG.warn("The config keys {} and {} are deprecated now, instead please use {} and {}. In " 176 + "future release we will remove the two deprecated configs.", 177 DEPRECATED_BUFFER_SIZE_KEY, DEPRECATED_MAX_BUFFER_COUNT_KEY, BUFFER_SIZE_KEY, 178 MAX_BUFFER_COUNT_KEY); 179 } 180 int poolBufSize = conf.getInt(BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE); 181 if (reservoirEnabled) { 182 // The max number of buffers to be pooled in the ByteBufferPool. The default value been 183 // selected based on the #handlers configured. When it is read request, 2 MB is the max size 184 // at which we will send back one RPC request. Means max we need 2 MB for creating the 185 // response cell block. (Well it might be much lesser than this because in 2 MB size calc, we 186 // include the heap size overhead of each cells also.) Considering 2 MB, we will need 187 // (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size 188 // is by default 64 KB. 189 // In case of read request, at the end of the handler process, we will make the response 190 // cellblock and add the Call to connection's response Q and a single Responder thread takes 191 // connections and responses from that one by one and do the socket write. So there is chances 192 // that by the time a handler originated response is actually done writing to socket and so 193 // released the BBs it used, the handler might have processed one more read req. On an avg 2x 194 // we consider and consider that also for the max buffers to pool 195 int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize; 196 int maxBuffCount = 197 conf.getInt(MAX_BUFFER_COUNT_KEY, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 198 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2); 199 int minSizeForReservoirUse = conf.getInt(MIN_ALLOCATE_SIZE_KEY, poolBufSize / 6); 200 return new ByteBuffAllocator(true, maxBuffCount, poolBufSize, minSizeForReservoirUse); 201 } else { 202 return HEAP; 203 } 204 } 205 206 /** 207 * Initialize an {@link ByteBuffAllocator} which only allocate ByteBuffer from on-heap, it's 208 * designed for testing purpose or disabled reservoir case. 209 * @return allocator to allocate on-heap ByteBuffer. 210 */ 211 private static ByteBuffAllocator createOnHeap() { 212 return new ByteBuffAllocator(false, 0, DEFAULT_BUFFER_SIZE, Integer.MAX_VALUE); 213 } 214 215 @VisibleForTesting 216 ByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize, 217 int minSizeForReservoirUse) { 218 this.reservoirEnabled = reservoirEnabled; 219 this.maxBufCount = maxBufCount; 220 this.bufSize = bufSize; 221 this.minSizeForReservoirUse = minSizeForReservoirUse; 222 } 223 224 public boolean isReservoirEnabled() { 225 return reservoirEnabled; 226 } 227 228 public long getHeapAllocationBytes() { 229 return heapAllocationBytes.sum(); 230 } 231 232 public long getPoolAllocationBytes() { 233 return poolAllocationBytes.sum(); 234 } 235 236 public int getBufferSize() { 237 return this.bufSize; 238 } 239 240 public int getUsedBufferCount() { 241 return this.usedBufCount.intValue(); 242 } 243 244 /** 245 * The {@link ConcurrentLinkedQueue#size()} is O(N) complexity and time-consuming, so DO NOT use 246 * the method except in UT. 247 */ 248 @VisibleForTesting 249 public int getFreeBufferCount() { 250 return this.buffers.size(); 251 } 252 253 public int getTotalBufferCount() { 254 return maxBufCount; 255 } 256 257 public static long getHeapAllocationBytes(ByteBuffAllocator... allocators) { 258 long heapAllocBytes = 0; 259 for (ByteBuffAllocator alloc : Sets.newHashSet(allocators)) { 260 heapAllocBytes += alloc.getHeapAllocationBytes(); 261 } 262 return heapAllocBytes; 263 } 264 265 public static double getHeapAllocationRatio(ByteBuffAllocator... allocators) { 266 double heapDelta = 0.0, poolDelta = 0.0; 267 long heapAllocBytes, poolAllocBytes; 268 // If disabled the pool allocator, then we use the global HEAP allocator. otherwise we use 269 // the pool allocator to allocate offheap ByteBuffers and use the HEAP to allocate heap 270 // ByteBuffers. So here we use a HashSet to remove the duplicated allocator object in disable 271 // case. 272 for (ByteBuffAllocator alloc : Sets.newHashSet(allocators)) { 273 heapAllocBytes = alloc.heapAllocationBytes.sum(); 274 poolAllocBytes = alloc.poolAllocationBytes.sum(); 275 heapDelta += (heapAllocBytes - alloc.lastHeapAllocationBytes); 276 poolDelta += (poolAllocBytes - alloc.lastPoolAllocationBytes); 277 alloc.lastHeapAllocationBytes = heapAllocBytes; 278 alloc.lastPoolAllocationBytes = poolAllocBytes; 279 } 280 // Calculate the heap allocation ratio. 281 if (Math.abs(heapDelta + poolDelta) < 1e-3) { 282 return 0.0; 283 } 284 return heapDelta / (heapDelta + poolDelta); 285 } 286 287 /** 288 * Allocate an buffer with buffer size from ByteBuffAllocator, Note to call the 289 * {@link ByteBuff#release()} if no need any more, otherwise the memory leak happen in NIO 290 * ByteBuffer pool. 291 * @return an ByteBuff with the buffer size. 292 */ 293 public SingleByteBuff allocateOneBuffer() { 294 if (isReservoirEnabled()) { 295 ByteBuffer bb = getBuffer(); 296 if (bb != null) { 297 return new SingleByteBuff(() -> putbackBuffer(bb), bb); 298 } 299 } 300 // Allocated from heap, let the JVM free its memory. 301 return (SingleByteBuff) ByteBuff.wrap(allocateOnHeap(bufSize)); 302 } 303 304 private ByteBuffer allocateOnHeap(int size) { 305 heapAllocationBytes.add(size); 306 return ByteBuffer.allocate(size); 307 } 308 309 /** 310 * Allocate size bytes from the ByteBufAllocator, Note to call the {@link ByteBuff#release()} if 311 * no need any more, otherwise the memory leak happen in NIO ByteBuffer pool. 312 * @param size to allocate 313 * @return an ByteBuff with the desired size. 314 */ 315 public ByteBuff allocate(int size) { 316 if (size < 0) { 317 throw new IllegalArgumentException("size to allocate should >=0"); 318 } 319 // If disabled the reservoir, just allocate it from on-heap. 320 if (!isReservoirEnabled() || size == 0) { 321 return ByteBuff.wrap(allocateOnHeap(size)); 322 } 323 int reminder = size % bufSize; 324 int len = size / bufSize + (reminder > 0 ? 1 : 0); 325 List<ByteBuffer> bbs = new ArrayList<>(len); 326 // Allocate from ByteBufferPool until the remaining is less than minSizeForReservoirUse or 327 // reservoir is exhausted. 328 int remain = size; 329 while (remain >= minSizeForReservoirUse) { 330 ByteBuffer bb = this.getBuffer(); 331 if (bb == null) { 332 break; 333 } 334 bbs.add(bb); 335 remain -= bufSize; 336 } 337 int lenFromReservoir = bbs.size(); 338 if (remain > 0) { 339 // If the last ByteBuffer is too small or the reservoir can not provide more ByteBuffers, we 340 // just allocate the ByteBuffer from on-heap. 341 bbs.add(allocateOnHeap(remain)); 342 } 343 ByteBuff bb = ByteBuff.wrap(bbs, () -> { 344 for (int i = 0; i < lenFromReservoir; i++) { 345 this.putbackBuffer(bbs.get(i)); 346 } 347 }); 348 bb.limit(size); 349 return bb; 350 } 351 352 /** 353 * Free all direct buffers if allocated, mainly used for testing. 354 */ 355 @VisibleForTesting 356 public void clean() { 357 while (!buffers.isEmpty()) { 358 ByteBuffer b = buffers.poll(); 359 if (b instanceof DirectBuffer) { 360 DirectBuffer db = (DirectBuffer) b; 361 if (db.cleaner() != null) { 362 db.cleaner().clean(); 363 } 364 } 365 } 366 this.usedBufCount.set(0); 367 this.maxPoolSizeInfoLevelLogged = false; 368 this.poolAllocationBytes.reset(); 369 this.heapAllocationBytes.reset(); 370 this.lastPoolAllocationBytes = 0; 371 this.lastHeapAllocationBytes = 0; 372 } 373 374 /** 375 * @return One free DirectByteBuffer from the pool. If no free ByteBuffer and we have not reached 376 * the maximum pool size, it will create a new one and return. In case of max pool size 377 * also reached, will return null. When pool returned a ByteBuffer, make sure to return it 378 * back to pool after use. 379 */ 380 private ByteBuffer getBuffer() { 381 ByteBuffer bb = buffers.poll(); 382 if (bb != null) { 383 // To reset the limit to capacity and position to 0, must clear here. 384 bb.clear(); 385 poolAllocationBytes.add(bufSize); 386 return bb; 387 } 388 while (true) { 389 int c = this.usedBufCount.intValue(); 390 if (c >= this.maxBufCount) { 391 if (!maxPoolSizeInfoLevelLogged) { 392 LOG.info("Pool already reached its max capacity : {} and no free buffers now. Consider " 393 + "increasing the value for '{}' ?", 394 maxBufCount, MAX_BUFFER_COUNT_KEY); 395 maxPoolSizeInfoLevelLogged = true; 396 } 397 return null; 398 } 399 if (!this.usedBufCount.compareAndSet(c, c + 1)) { 400 continue; 401 } 402 poolAllocationBytes.add(bufSize); 403 return ByteBuffer.allocateDirect(bufSize); 404 } 405 } 406 407 /** 408 * Return back a ByteBuffer after its use. Don't read/write the ByteBuffer after the returning. 409 * @param buf ByteBuffer to return. 410 */ 411 private void putbackBuffer(ByteBuffer buf) { 412 if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) { 413 LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored"); 414 return; 415 } 416 buffers.offer(buf); 417 } 418}