001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.io;
020
021import java.nio.ByteBuffer;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Queue;
025import java.util.concurrent.ConcurrentLinkedQueue;
026import java.util.concurrent.atomic.AtomicInteger;
027import java.util.concurrent.atomic.LongAdder;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.nio.ByteBuff;
031import org.apache.hadoop.hbase.nio.SingleByteBuff;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035import sun.nio.ch.DirectBuffer;
036
037import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
038
039/**
040 * ByteBuffAllocator is used for allocating/freeing the ByteBuffers from/to NIO ByteBuffer pool, and
041 * it provide high-level interfaces for upstream. when allocating desired memory size, it will
042 * return {@link ByteBuff}, if we are sure that those ByteBuffers have reached the end of life
043 * cycle, we must do the {@link ByteBuff#release()} to return back the buffers to the pool,
044 * otherwise ByteBuffers leak will happen, and the NIO ByteBuffer pool may be exhausted. there's
045 * possible that the desired memory size is large than ByteBufferPool has, we'll downgrade to
046 * allocate ByteBuffers from heap which meaning the GC pressure may increase again. Of course, an
047 * better way is increasing the ByteBufferPool size if we detected this case. <br/>
048 * <br/>
049 * On the other hand, for better memory utilization, we have set an lower bound named
050 * minSizeForReservoirUse in this allocator, and if the desired size is less than
051 * minSizeForReservoirUse, the allocator will just allocate the ByteBuffer from heap and let the JVM
052 * free its memory, because it's too wasting to allocate a single fixed-size ByteBuffer for some
053 * small objects. <br/>
054 * <br/>
055 * We recommend to use this class to allocate/free {@link ByteBuff} in the RPC layer or the entire
056 * read/write path, because it hide the details of memory management and its APIs are more friendly
057 * to the upper layer.
058 */
059@InterfaceAudience.Private
060public class ByteBuffAllocator {
061
062  private static final Logger LOG = LoggerFactory.getLogger(ByteBuffAllocator.class);
063
064  // The on-heap allocator is mostly used for testing, but also some non-test usage, such as
065  // scanning snapshot, we won't have an RpcServer to initialize the allocator, so just use the
066  // default heap allocator, it will just allocate ByteBuffers from heap but wrapped by an ByteBuff.
067  public static final ByteBuffAllocator HEAP = ByteBuffAllocator.createOnHeap();
068
069  public static final String ALLOCATOR_POOL_ENABLED_KEY = "hbase.server.allocator.pool.enabled";
070
071  public static final String MAX_BUFFER_COUNT_KEY = "hbase.server.allocator.max.buffer.count";
072
073  public static final String BUFFER_SIZE_KEY = "hbase.server.allocator.buffer.size";
074
075  public static final String MIN_ALLOCATE_SIZE_KEY = "hbase.server.allocator.minimal.allocate.size";
076
077  /**
078   * @deprecated since 2.3.0 and will be removed in 4.0.0. Use
079   *   {@link ByteBuffAllocator#ALLOCATOR_POOL_ENABLED_KEY} instead.
080   */
081  @Deprecated
082  public static final String DEPRECATED_ALLOCATOR_POOL_ENABLED_KEY =
083      "hbase.ipc.server.reservoir.enabled";
084
085  /**
086   * @deprecated since 2.3.0 and will be removed in 4.0.0. Use
087   *   {@link ByteBuffAllocator#MAX_BUFFER_COUNT_KEY} instead.
088   */
089  @Deprecated
090  static final String DEPRECATED_MAX_BUFFER_COUNT_KEY = "hbase.ipc.server.reservoir.initial.max";
091
092  /**
093   * @deprecated since 2.3.0 and will be removed in 4.0.0. Use
094   *   {@link ByteBuffAllocator#BUFFER_SIZE_KEY} instead.
095   */
096  @Deprecated
097  static final String DEPRECATED_BUFFER_SIZE_KEY = "hbase.ipc.server.reservoir.initial.buffer.size";
098
099  /**
100   * The hbase.ipc.server.reservoir.initial.max and hbase.ipc.server.reservoir.initial.buffer.size
101   * were introduced in HBase2.0.0, while in HBase3.0.0 the two config keys will be replaced by
102   * {@link ByteBuffAllocator#MAX_BUFFER_COUNT_KEY} and {@link ByteBuffAllocator#BUFFER_SIZE_KEY}.
103   * Also the hbase.ipc.server.reservoir.enabled will be replaced by
104   * hbase.server.allocator.pool.enabled. Keep the three old config keys here for HBase2.x
105   * compatibility.
106   */
107  static {
108    Configuration.addDeprecation(DEPRECATED_ALLOCATOR_POOL_ENABLED_KEY, ALLOCATOR_POOL_ENABLED_KEY);
109    Configuration.addDeprecation(DEPRECATED_MAX_BUFFER_COUNT_KEY, MAX_BUFFER_COUNT_KEY);
110    Configuration.addDeprecation(DEPRECATED_BUFFER_SIZE_KEY, BUFFER_SIZE_KEY);
111  }
112
113  /**
114   * There're some reasons why better to choose 65KB(rather than 64KB) as the default buffer size:
115   * <p>
116   * 1. Almost all of the data blocks have the block size: 64KB + delta, whose delta is very small,
117   * depends on the size of lastKeyValue. If we set buffer.size=64KB, then each block will be
118   * allocated as a MultiByteBuff: one 64KB DirectByteBuffer and delta bytes HeapByteBuffer, the
119   * HeapByteBuffer will increase the GC pressure. Ideally, we should let the data block to be
120   * allocated as a SingleByteBuff, it has simpler data structure, faster access speed, less heap
121   * usage.
122   * <p>
123   * 2. Since the blocks are MultiByteBuff when using buffer.size=64KB, so we have to calculate the
124   * checksum by an temp heap copying (see HBASE-21917), while if it's a SingleByteBuff, we can
125   * speed the checksum by calling the hadoop' checksum in native lib, which is more faster.
126   * <p>
127   * For performance comparison, please see HBASE-22483.
128   */
129  public static final int DEFAULT_BUFFER_SIZE = 65 * 1024;
130
131  public static final Recycler NONE = () -> {
132  };
133
134  public interface Recycler {
135    void free();
136  }
137
138  private final boolean reservoirEnabled;
139  private final int bufSize;
140  private final int maxBufCount;
141  private final AtomicInteger usedBufCount = new AtomicInteger(0);
142
143  private boolean maxPoolSizeInfoLevelLogged = false;
144
145  // If the desired size is at least this size, it'll allocated from ByteBufferPool, otherwise it'll
146  // allocated from heap for better utilization. We make this to be 1/6th of the pool buffer size.
147  private final int minSizeForReservoirUse;
148
149  private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
150
151  // Metrics to track the pool allocation bytes and heap allocation bytes. If heap allocation
152  // bytes is increasing so much, then we may need to increase the max.buffer.count .
153  private final LongAdder poolAllocationBytes = new LongAdder();
154  private final LongAdder heapAllocationBytes = new LongAdder();
155  private long lastPoolAllocationBytes = 0;
156  private long lastHeapAllocationBytes = 0;
157
158  /**
159   * Initialize an {@link ByteBuffAllocator} which will try to allocate ByteBuffers from off-heap if
160   * reservoir is enabled and the reservoir has enough buffers, otherwise the allocator will just
161   * allocate the insufficient buffers from on-heap to meet the requirement.
162   * @param conf which get the arguments to initialize the allocator.
163   * @param reservoirEnabled indicate whether the reservoir is enabled or disabled. NOTICE: if
164   *          reservoir is enabled, then we will use the pool allocator to allocate off-heap
165   *          ByteBuffers and use the HEAP allocator to allocate heap ByteBuffers. Otherwise if
166   *          reservoir is disabled then all allocations will happen in HEAP instance.
167   * @return ByteBuffAllocator to manage the byte buffers.
168   */
169  public static ByteBuffAllocator create(Configuration conf, boolean reservoirEnabled) {
170    if (conf.get(DEPRECATED_BUFFER_SIZE_KEY) != null
171        || conf.get(DEPRECATED_MAX_BUFFER_COUNT_KEY) != null) {
172      LOG.warn("The config keys {} and {} are deprecated now, instead please use {} and {}. In "
173            + "future release we will remove the two deprecated configs.",
174        DEPRECATED_BUFFER_SIZE_KEY, DEPRECATED_MAX_BUFFER_COUNT_KEY, BUFFER_SIZE_KEY,
175        MAX_BUFFER_COUNT_KEY);
176    }
177    int poolBufSize = conf.getInt(BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE);
178    if (reservoirEnabled) {
179      // The max number of buffers to be pooled in the ByteBufferPool. The default value been
180      // selected based on the #handlers configured. When it is read request, 2 MB is the max size
181      // at which we will send back one RPC request. Means max we need 2 MB for creating the
182      // response cell block. (Well it might be much lesser than this because in 2 MB size calc, we
183      // include the heap size overhead of each cells also.) Considering 2 MB, we will need
184      // (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size
185      // is by default 64 KB.
186      // In case of read request, at the end of the handler process, we will make the response
187      // cellblock and add the Call to connection's response Q and a single Responder thread takes
188      // connections and responses from that one by one and do the socket write. So there is chances
189      // that by the time a handler originated response is actually done writing to socket and so
190      // released the BBs it used, the handler might have processed one more read req. On an avg 2x
191      // we consider and consider that also for the max buffers to pool
192      int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize;
193      int maxBuffCount =
194          conf.getInt(MAX_BUFFER_COUNT_KEY, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
195            HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2);
196      int minSizeForReservoirUse = conf.getInt(MIN_ALLOCATE_SIZE_KEY, poolBufSize / 6);
197      return new ByteBuffAllocator(true, maxBuffCount, poolBufSize, minSizeForReservoirUse);
198    } else {
199      return HEAP;
200    }
201  }
202
203  /**
204   * Initialize an {@link ByteBuffAllocator} which only allocate ByteBuffer from on-heap, it's
205   * designed for testing purpose or disabled reservoir case.
206   * @return allocator to allocate on-heap ByteBuffer.
207   */
208  private static ByteBuffAllocator createOnHeap() {
209    return new ByteBuffAllocator(false, 0, DEFAULT_BUFFER_SIZE, Integer.MAX_VALUE);
210  }
211
212  ByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize,
213      int minSizeForReservoirUse) {
214    this.reservoirEnabled = reservoirEnabled;
215    this.maxBufCount = maxBufCount;
216    this.bufSize = bufSize;
217    this.minSizeForReservoirUse = minSizeForReservoirUse;
218  }
219
220  public boolean isReservoirEnabled() {
221    return reservoirEnabled;
222  }
223
224  public long getHeapAllocationBytes() {
225    return heapAllocationBytes.sum();
226  }
227
228  public long getPoolAllocationBytes() {
229    return poolAllocationBytes.sum();
230  }
231
232  public int getBufferSize() {
233    return this.bufSize;
234  }
235
236  public int getUsedBufferCount() {
237    return this.usedBufCount.intValue();
238  }
239
240  /**
241   * The {@link ConcurrentLinkedQueue#size()} is O(N) complexity and time-consuming, so DO NOT use
242   * the method except in UT.
243   */
244  public int getFreeBufferCount() {
245    return this.buffers.size();
246  }
247
248  public int getTotalBufferCount() {
249    return maxBufCount;
250  }
251
252  public static long getHeapAllocationBytes(ByteBuffAllocator... allocators) {
253    long heapAllocBytes = 0;
254    for (ByteBuffAllocator alloc : Sets.newHashSet(allocators)) {
255      heapAllocBytes += alloc.getHeapAllocationBytes();
256    }
257    return heapAllocBytes;
258  }
259
260  public static double getHeapAllocationRatio(ByteBuffAllocator... allocators) {
261    double heapDelta = 0.0, poolDelta = 0.0;
262    long heapAllocBytes, poolAllocBytes;
263    // If disabled the pool allocator, then we use the global HEAP allocator. otherwise we use
264    // the pool allocator to allocate offheap ByteBuffers and use the HEAP to allocate heap
265    // ByteBuffers. So here we use a HashSet to remove the duplicated allocator object in disable
266    // case.
267    for (ByteBuffAllocator alloc : Sets.newHashSet(allocators)) {
268      heapAllocBytes = alloc.heapAllocationBytes.sum();
269      poolAllocBytes = alloc.poolAllocationBytes.sum();
270      heapDelta += (heapAllocBytes - alloc.lastHeapAllocationBytes);
271      poolDelta += (poolAllocBytes - alloc.lastPoolAllocationBytes);
272      alloc.lastHeapAllocationBytes = heapAllocBytes;
273      alloc.lastPoolAllocationBytes = poolAllocBytes;
274    }
275    // Calculate the heap allocation ratio.
276    if (Math.abs(heapDelta + poolDelta) < 1e-3) {
277      return 0.0;
278    }
279    return heapDelta / (heapDelta + poolDelta);
280  }
281
282  /**
283   * Allocate an buffer with buffer size from ByteBuffAllocator, Note to call the
284   * {@link ByteBuff#release()} if no need any more, otherwise the memory leak happen in NIO
285   * ByteBuffer pool.
286   * @return an ByteBuff with the buffer size.
287   */
288  public SingleByteBuff allocateOneBuffer() {
289    if (isReservoirEnabled()) {
290      ByteBuffer bb = getBuffer();
291      if (bb != null) {
292        return new SingleByteBuff(() -> putbackBuffer(bb), bb);
293      }
294    }
295    // Allocated from heap, let the JVM free its memory.
296    return (SingleByteBuff) ByteBuff.wrap(allocateOnHeap(bufSize));
297  }
298
299  private ByteBuffer allocateOnHeap(int size) {
300    heapAllocationBytes.add(size);
301    return ByteBuffer.allocate(size);
302  }
303
304  /**
305   * Allocate size bytes from the ByteBufAllocator, Note to call the {@link ByteBuff#release()} if
306   * no need any more, otherwise the memory leak happen in NIO ByteBuffer pool.
307   * @param size to allocate
308   * @return an ByteBuff with the desired size.
309   */
310  public ByteBuff allocate(int size) {
311    if (size < 0) {
312      throw new IllegalArgumentException("size to allocate should >=0");
313    }
314    // If disabled the reservoir, just allocate it from on-heap.
315    if (!isReservoirEnabled() || size == 0) {
316      return ByteBuff.wrap(allocateOnHeap(size));
317    }
318    int reminder = size % bufSize;
319    int len = size / bufSize + (reminder > 0 ? 1 : 0);
320    List<ByteBuffer> bbs = new ArrayList<>(len);
321    // Allocate from ByteBufferPool until the remaining is less than minSizeForReservoirUse or
322    // reservoir is exhausted.
323    int remain = size;
324    while (remain >= minSizeForReservoirUse) {
325      ByteBuffer bb = this.getBuffer();
326      if (bb == null) {
327        break;
328      }
329      bbs.add(bb);
330      remain -= bufSize;
331    }
332    int lenFromReservoir = bbs.size();
333    if (remain > 0) {
334      // If the last ByteBuffer is too small or the reservoir can not provide more ByteBuffers, we
335      // just allocate the ByteBuffer from on-heap.
336      bbs.add(allocateOnHeap(remain));
337    }
338    ByteBuff bb = ByteBuff.wrap(bbs, () -> {
339      for (int i = 0; i < lenFromReservoir; i++) {
340        this.putbackBuffer(bbs.get(i));
341      }
342    });
343    bb.limit(size);
344    return bb;
345  }
346
347  /**
348   * Free all direct buffers if allocated, mainly used for testing.
349   */
350  public void clean() {
351    while (!buffers.isEmpty()) {
352      ByteBuffer b = buffers.poll();
353      if (b instanceof DirectBuffer) {
354        DirectBuffer db = (DirectBuffer) b;
355        if (db.cleaner() != null) {
356          db.cleaner().clean();
357        }
358      }
359    }
360    this.usedBufCount.set(0);
361    this.maxPoolSizeInfoLevelLogged = false;
362    this.poolAllocationBytes.reset();
363    this.heapAllocationBytes.reset();
364    this.lastPoolAllocationBytes = 0;
365    this.lastHeapAllocationBytes = 0;
366  }
367
368  /**
369   * @return One free DirectByteBuffer from the pool. If no free ByteBuffer and we have not reached
370   *         the maximum pool size, it will create a new one and return. In case of max pool size
371   *         also reached, will return null. When pool returned a ByteBuffer, make sure to return it
372   *         back to pool after use.
373   */
374  private ByteBuffer getBuffer() {
375    ByteBuffer bb = buffers.poll();
376    if (bb != null) {
377      // To reset the limit to capacity and position to 0, must clear here.
378      bb.clear();
379      poolAllocationBytes.add(bufSize);
380      return bb;
381    }
382    while (true) {
383      int c = this.usedBufCount.intValue();
384      if (c >= this.maxBufCount) {
385        if (!maxPoolSizeInfoLevelLogged) {
386          LOG.info("Pool already reached its max capacity : {} and no free buffers now. Consider "
387              + "increasing the value for '{}' ?",
388            maxBufCount, MAX_BUFFER_COUNT_KEY);
389          maxPoolSizeInfoLevelLogged = true;
390        }
391        return null;
392      }
393      if (!this.usedBufCount.compareAndSet(c, c + 1)) {
394        continue;
395      }
396      poolAllocationBytes.add(bufSize);
397      return ByteBuffer.allocateDirect(bufSize);
398    }
399  }
400
401  /**
402   * Return back a ByteBuffer after its use. Don't read/write the ByteBuffer after the returning.
403   * @param buf ByteBuffer to return.
404   */
405  private void putbackBuffer(ByteBuffer buf) {
406    if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) {
407      LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
408      return;
409    }
410    buffers.offer(buf);
411  }
412}