001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.io;
020
021import java.nio.ByteBuffer;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Queue;
025import java.util.concurrent.ConcurrentLinkedQueue;
026import java.util.concurrent.atomic.AtomicInteger;
027import java.util.concurrent.atomic.LongAdder;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.nio.ByteBuff;
032import org.apache.hadoop.hbase.nio.SingleByteBuff;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
038import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
039
040import sun.nio.ch.DirectBuffer;
041
042/**
043 * ByteBuffAllocator is used for allocating/freeing the ByteBuffers from/to NIO ByteBuffer pool, and
044 * it provide high-level interfaces for upstream. when allocating desired memory size, it will
045 * return {@link ByteBuff}, if we are sure that those ByteBuffers have reached the end of life
046 * cycle, we must do the {@link ByteBuff#release()} to return back the buffers to the pool,
047 * otherwise ByteBuffers leak will happen, and the NIO ByteBuffer pool may be exhausted. there's
048 * possible that the desired memory size is large than ByteBufferPool has, we'll downgrade to
049 * allocate ByteBuffers from heap which meaning the GC pressure may increase again. Of course, an
050 * better way is increasing the ByteBufferPool size if we detected this case. <br/>
051 * <br/>
052 * On the other hand, for better memory utilization, we have set an lower bound named
053 * minSizeForReservoirUse in this allocator, and if the desired size is less than
054 * minSizeForReservoirUse, the allocator will just allocate the ByteBuffer from heap and let the JVM
055 * free its memory, because it's too wasting to allocate a single fixed-size ByteBuffer for some
056 * small objects. <br/>
057 * <br/>
058 * We recommend to use this class to allocate/free {@link ByteBuff} in the RPC layer or the entire
059 * read/write path, because it hide the details of memory management and its APIs are more friendly
060 * to the upper layer.
061 */
062@InterfaceAudience.Private
063public class ByteBuffAllocator {
064
065  private static final Logger LOG = LoggerFactory.getLogger(ByteBuffAllocator.class);
066
067  // The on-heap allocator is mostly used for testing, but also some non-test usage, such as
068  // scanning snapshot, we won't have an RpcServer to initialize the allocator, so just use the
069  // default heap allocator, it will just allocate ByteBuffers from heap but wrapped by an ByteBuff.
070  public static final ByteBuffAllocator HEAP = ByteBuffAllocator.createOnHeap();
071
072  public static final String ALLOCATOR_POOL_ENABLED_KEY = "hbase.server.allocator.pool.enabled";
073
074  public static final String MAX_BUFFER_COUNT_KEY = "hbase.server.allocator.max.buffer.count";
075
076  public static final String BUFFER_SIZE_KEY = "hbase.server.allocator.buffer.size";
077
078  public static final String MIN_ALLOCATE_SIZE_KEY = "hbase.server.allocator.minimal.allocate.size";
079
080  /**
081   * @deprecated since 2.3.0 and will be removed in 4.0.0. Use
082   *   {@link ByteBuffAllocator#ALLOCATOR_POOL_ENABLED_KEY} instead.
083   */
084  @Deprecated
085  public static final String DEPRECATED_ALLOCATOR_POOL_ENABLED_KEY =
086      "hbase.ipc.server.reservoir.enabled";
087
088  /**
089   * @deprecated since 2.3.0 and will be removed in 4.0.0. Use
090   *   {@link ByteBuffAllocator#MAX_BUFFER_COUNT_KEY} instead.
091   */
092  @Deprecated
093  static final String DEPRECATED_MAX_BUFFER_COUNT_KEY = "hbase.ipc.server.reservoir.initial.max";
094
095  /**
096   * @deprecated since 2.3.0 and will be removed in 4.0.0. Use
097   *   {@link ByteBuffAllocator#BUFFER_SIZE_KEY} instead.
098   */
099  @Deprecated
100  static final String DEPRECATED_BUFFER_SIZE_KEY = "hbase.ipc.server.reservoir.initial.buffer.size";
101
102  /**
103   * The hbase.ipc.server.reservoir.initial.max and hbase.ipc.server.reservoir.initial.buffer.size
104   * were introduced in HBase2.0.0, while in HBase3.0.0 the two config keys will be replaced by
105   * {@link ByteBuffAllocator#MAX_BUFFER_COUNT_KEY} and {@link ByteBuffAllocator#BUFFER_SIZE_KEY}.
106   * Also the hbase.ipc.server.reservoir.enabled will be replaced by
107   * hbase.server.allocator.pool.enabled. Keep the three old config keys here for HBase2.x
108   * compatibility.
109   */
110  static {
111    Configuration.addDeprecation(DEPRECATED_ALLOCATOR_POOL_ENABLED_KEY, ALLOCATOR_POOL_ENABLED_KEY);
112    Configuration.addDeprecation(DEPRECATED_MAX_BUFFER_COUNT_KEY, MAX_BUFFER_COUNT_KEY);
113    Configuration.addDeprecation(DEPRECATED_BUFFER_SIZE_KEY, BUFFER_SIZE_KEY);
114  }
115
116  /**
117   * There're some reasons why better to choose 65KB(rather than 64KB) as the default buffer size:
118   * <p>
119   * 1. Almost all of the data blocks have the block size: 64KB + delta, whose delta is very small,
120   * depends on the size of lastKeyValue. If we set buffer.size=64KB, then each block will be
121   * allocated as a MultiByteBuff: one 64KB DirectByteBuffer and delta bytes HeapByteBuffer, the
122   * HeapByteBuffer will increase the GC pressure. Ideally, we should let the data block to be
123   * allocated as a SingleByteBuff, it has simpler data structure, faster access speed, less heap
124   * usage.
125   * <p>
126   * 2. Since the blocks are MultiByteBuff when using buffer.size=64KB, so we have to calculate the
127   * checksum by an temp heap copying (see HBASE-21917), while if it's a SingleByteBuff, we can
128   * speed the checksum by calling the hadoop' checksum in native lib, which is more faster.
129   * <p>
130   * For performance comparison, please see HBASE-22483.
131   */
132  public static final int DEFAULT_BUFFER_SIZE = 65 * 1024;
133
134  public static final Recycler NONE = () -> {
135  };
136
137  public interface Recycler {
138    void free();
139  }
140
141  private final boolean reservoirEnabled;
142  private final int bufSize;
143  private final int maxBufCount;
144  private final AtomicInteger usedBufCount = new AtomicInteger(0);
145
146  private boolean maxPoolSizeInfoLevelLogged = false;
147
148  // If the desired size is at least this size, it'll allocated from ByteBufferPool, otherwise it'll
149  // allocated from heap for better utilization. We make this to be 1/6th of the pool buffer size.
150  private final int minSizeForReservoirUse;
151
152  private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
153
154  // Metrics to track the pool allocation bytes and heap allocation bytes. If heap allocation
155  // bytes is increasing so much, then we may need to increase the max.buffer.count .
156  private final LongAdder poolAllocationBytes = new LongAdder();
157  private final LongAdder heapAllocationBytes = new LongAdder();
158  private long lastPoolAllocationBytes = 0;
159  private long lastHeapAllocationBytes = 0;
160
161  /**
162   * Initialize an {@link ByteBuffAllocator} which will try to allocate ByteBuffers from off-heap if
163   * reservoir is enabled and the reservoir has enough buffers, otherwise the allocator will just
164   * allocate the insufficient buffers from on-heap to meet the requirement.
165   * @param conf which get the arguments to initialize the allocator.
166   * @param reservoirEnabled indicate whether the reservoir is enabled or disabled. NOTICE: if
167   *          reservoir is enabled, then we will use the pool allocator to allocate off-heap
168   *          ByteBuffers and use the HEAP allocator to allocate heap ByteBuffers. Otherwise if
169   *          reservoir is disabled then all allocations will happen in HEAP instance.
170   * @return ByteBuffAllocator to manage the byte buffers.
171   */
172  public static ByteBuffAllocator create(Configuration conf, boolean reservoirEnabled) {
173    if (conf.get(DEPRECATED_BUFFER_SIZE_KEY) != null
174        || conf.get(DEPRECATED_MAX_BUFFER_COUNT_KEY) != null) {
175      LOG.warn("The config keys {} and {} are deprecated now, instead please use {} and {}. In "
176            + "future release we will remove the two deprecated configs.",
177        DEPRECATED_BUFFER_SIZE_KEY, DEPRECATED_MAX_BUFFER_COUNT_KEY, BUFFER_SIZE_KEY,
178        MAX_BUFFER_COUNT_KEY);
179    }
180    int poolBufSize = conf.getInt(BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE);
181    if (reservoirEnabled) {
182      // The max number of buffers to be pooled in the ByteBufferPool. The default value been
183      // selected based on the #handlers configured. When it is read request, 2 MB is the max size
184      // at which we will send back one RPC request. Means max we need 2 MB for creating the
185      // response cell block. (Well it might be much lesser than this because in 2 MB size calc, we
186      // include the heap size overhead of each cells also.) Considering 2 MB, we will need
187      // (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size
188      // is by default 64 KB.
189      // In case of read request, at the end of the handler process, we will make the response
190      // cellblock and add the Call to connection's response Q and a single Responder thread takes
191      // connections and responses from that one by one and do the socket write. So there is chances
192      // that by the time a handler originated response is actually done writing to socket and so
193      // released the BBs it used, the handler might have processed one more read req. On an avg 2x
194      // we consider and consider that also for the max buffers to pool
195      int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize;
196      int maxBuffCount =
197          conf.getInt(MAX_BUFFER_COUNT_KEY, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
198            HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2);
199      int minSizeForReservoirUse = conf.getInt(MIN_ALLOCATE_SIZE_KEY, poolBufSize / 6);
200      return new ByteBuffAllocator(true, maxBuffCount, poolBufSize, minSizeForReservoirUse);
201    } else {
202      return HEAP;
203    }
204  }
205
206  /**
207   * Initialize an {@link ByteBuffAllocator} which only allocate ByteBuffer from on-heap, it's
208   * designed for testing purpose or disabled reservoir case.
209   * @return allocator to allocate on-heap ByteBuffer.
210   */
211  private static ByteBuffAllocator createOnHeap() {
212    return new ByteBuffAllocator(false, 0, DEFAULT_BUFFER_SIZE, Integer.MAX_VALUE);
213  }
214
215  @VisibleForTesting
216  ByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize,
217      int minSizeForReservoirUse) {
218    this.reservoirEnabled = reservoirEnabled;
219    this.maxBufCount = maxBufCount;
220    this.bufSize = bufSize;
221    this.minSizeForReservoirUse = minSizeForReservoirUse;
222  }
223
224  public boolean isReservoirEnabled() {
225    return reservoirEnabled;
226  }
227
228  public long getHeapAllocationBytes() {
229    return heapAllocationBytes.sum();
230  }
231
232  public long getPoolAllocationBytes() {
233    return poolAllocationBytes.sum();
234  }
235
236  public int getBufferSize() {
237    return this.bufSize;
238  }
239
240  public int getUsedBufferCount() {
241    return this.usedBufCount.intValue();
242  }
243
244  /**
245   * The {@link ConcurrentLinkedQueue#size()} is O(N) complexity and time-consuming, so DO NOT use
246   * the method except in UT.
247   */
248  @VisibleForTesting
249  public int getFreeBufferCount() {
250    return this.buffers.size();
251  }
252
253  public int getTotalBufferCount() {
254    return maxBufCount;
255  }
256
257  public static long getHeapAllocationBytes(ByteBuffAllocator... allocators) {
258    long heapAllocBytes = 0;
259    for (ByteBuffAllocator alloc : Sets.newHashSet(allocators)) {
260      heapAllocBytes += alloc.getHeapAllocationBytes();
261    }
262    return heapAllocBytes;
263  }
264
265  public static double getHeapAllocationRatio(ByteBuffAllocator... allocators) {
266    double heapDelta = 0.0, poolDelta = 0.0;
267    long heapAllocBytes, poolAllocBytes;
268    // If disabled the pool allocator, then we use the global HEAP allocator. otherwise we use
269    // the pool allocator to allocate offheap ByteBuffers and use the HEAP to allocate heap
270    // ByteBuffers. So here we use a HashSet to remove the duplicated allocator object in disable
271    // case.
272    for (ByteBuffAllocator alloc : Sets.newHashSet(allocators)) {
273      heapAllocBytes = alloc.heapAllocationBytes.sum();
274      poolAllocBytes = alloc.poolAllocationBytes.sum();
275      heapDelta += (heapAllocBytes - alloc.lastHeapAllocationBytes);
276      poolDelta += (poolAllocBytes - alloc.lastPoolAllocationBytes);
277      alloc.lastHeapAllocationBytes = heapAllocBytes;
278      alloc.lastPoolAllocationBytes = poolAllocBytes;
279    }
280    // Calculate the heap allocation ratio.
281    if (Math.abs(heapDelta + poolDelta) < 1e-3) {
282      return 0.0;
283    }
284    return heapDelta / (heapDelta + poolDelta);
285  }
286
287  /**
288   * Allocate an buffer with buffer size from ByteBuffAllocator, Note to call the
289   * {@link ByteBuff#release()} if no need any more, otherwise the memory leak happen in NIO
290   * ByteBuffer pool.
291   * @return an ByteBuff with the buffer size.
292   */
293  public SingleByteBuff allocateOneBuffer() {
294    if (isReservoirEnabled()) {
295      ByteBuffer bb = getBuffer();
296      if (bb != null) {
297        return new SingleByteBuff(() -> putbackBuffer(bb), bb);
298      }
299    }
300    // Allocated from heap, let the JVM free its memory.
301    return (SingleByteBuff) ByteBuff.wrap(allocateOnHeap(bufSize));
302  }
303
304  private ByteBuffer allocateOnHeap(int size) {
305    heapAllocationBytes.add(size);
306    return ByteBuffer.allocate(size);
307  }
308
309  /**
310   * Allocate size bytes from the ByteBufAllocator, Note to call the {@link ByteBuff#release()} if
311   * no need any more, otherwise the memory leak happen in NIO ByteBuffer pool.
312   * @param size to allocate
313   * @return an ByteBuff with the desired size.
314   */
315  public ByteBuff allocate(int size) {
316    if (size < 0) {
317      throw new IllegalArgumentException("size to allocate should >=0");
318    }
319    // If disabled the reservoir, just allocate it from on-heap.
320    if (!isReservoirEnabled() || size == 0) {
321      return ByteBuff.wrap(allocateOnHeap(size));
322    }
323    int reminder = size % bufSize;
324    int len = size / bufSize + (reminder > 0 ? 1 : 0);
325    List<ByteBuffer> bbs = new ArrayList<>(len);
326    // Allocate from ByteBufferPool until the remaining is less than minSizeForReservoirUse or
327    // reservoir is exhausted.
328    int remain = size;
329    while (remain >= minSizeForReservoirUse) {
330      ByteBuffer bb = this.getBuffer();
331      if (bb == null) {
332        break;
333      }
334      bbs.add(bb);
335      remain -= bufSize;
336    }
337    int lenFromReservoir = bbs.size();
338    if (remain > 0) {
339      // If the last ByteBuffer is too small or the reservoir can not provide more ByteBuffers, we
340      // just allocate the ByteBuffer from on-heap.
341      bbs.add(allocateOnHeap(remain));
342    }
343    ByteBuff bb = ByteBuff.wrap(bbs, () -> {
344      for (int i = 0; i < lenFromReservoir; i++) {
345        this.putbackBuffer(bbs.get(i));
346      }
347    });
348    bb.limit(size);
349    return bb;
350  }
351
352  /**
353   * Free all direct buffers if allocated, mainly used for testing.
354   */
355  @VisibleForTesting
356  public void clean() {
357    while (!buffers.isEmpty()) {
358      ByteBuffer b = buffers.poll();
359      if (b instanceof DirectBuffer) {
360        DirectBuffer db = (DirectBuffer) b;
361        if (db.cleaner() != null) {
362          db.cleaner().clean();
363        }
364      }
365    }
366    this.usedBufCount.set(0);
367    this.maxPoolSizeInfoLevelLogged = false;
368    this.poolAllocationBytes.reset();
369    this.heapAllocationBytes.reset();
370    this.lastPoolAllocationBytes = 0;
371    this.lastHeapAllocationBytes = 0;
372  }
373
374  /**
375   * @return One free DirectByteBuffer from the pool. If no free ByteBuffer and we have not reached
376   *         the maximum pool size, it will create a new one and return. In case of max pool size
377   *         also reached, will return null. When pool returned a ByteBuffer, make sure to return it
378   *         back to pool after use.
379   */
380  private ByteBuffer getBuffer() {
381    ByteBuffer bb = buffers.poll();
382    if (bb != null) {
383      // To reset the limit to capacity and position to 0, must clear here.
384      bb.clear();
385      poolAllocationBytes.add(bufSize);
386      return bb;
387    }
388    while (true) {
389      int c = this.usedBufCount.intValue();
390      if (c >= this.maxBufCount) {
391        if (!maxPoolSizeInfoLevelLogged) {
392          LOG.info("Pool already reached its max capacity : {} and no free buffers now. Consider "
393              + "increasing the value for '{}' ?",
394            maxBufCount, MAX_BUFFER_COUNT_KEY);
395          maxPoolSizeInfoLevelLogged = true;
396        }
397        return null;
398      }
399      if (!this.usedBufCount.compareAndSet(c, c + 1)) {
400        continue;
401      }
402      poolAllocationBytes.add(bufSize);
403      return ByteBuffer.allocateDirect(bufSize);
404    }
405  }
406
407  /**
408   * Return back a ByteBuffer after its use. Don't read/write the ByteBuffer after the returning.
409   * @param buf ByteBuffer to return.
410   */
411  private void putbackBuffer(ByteBuffer buf) {
412    if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) {
413      LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
414      return;
415    }
416    buffers.offer(buf);
417  }
418}