001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.io.hfile.bucket;
020
021import java.util.Arrays;
022import java.util.Comparator;
023import java.util.HashSet;
024import java.util.Iterator;
025import java.util.Map;
026import java.util.Queue;
027import java.util.Set;
028import java.util.concurrent.atomic.LongAdder;
029import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
030import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
036import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
037import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue;
038import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
039import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.LinkedMap;
040
041/**
042 * This class is used to allocate a block with specified size and free the block when evicting. It
043 * manages an array of buckets, each bucket is associated with a size and caches elements up to this
044 * size. For a completely empty bucket, this size could be re-specified dynamically.
045 * <p/>
046 * This class is not thread safe.
047 */
048@InterfaceAudience.Private
049public final class BucketAllocator {
050  private static final Logger LOG = LoggerFactory.getLogger(BucketAllocator.class);
051
052  public final static class Bucket {
053    private long baseOffset;
054    private int itemAllocationSize, sizeIndex;
055    private int itemCount;
056    private int freeList[];
057    private int freeCount, usedCount;
058
059    public Bucket(long offset) {
060      baseOffset = offset;
061      sizeIndex = -1;
062    }
063
064    void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) {
065      Preconditions.checkElementIndex(sizeIndex, bucketSizes.length);
066      this.sizeIndex = sizeIndex;
067      itemAllocationSize = bucketSizes[sizeIndex];
068      itemCount = (int) (bucketCapacity / (long) itemAllocationSize);
069      freeCount = itemCount;
070      usedCount = 0;
071      freeList = new int[itemCount];
072      for (int i = 0; i < freeCount; ++i)
073        freeList[i] = i;
074    }
075
076    public boolean isUninstantiated() {
077      return sizeIndex == -1;
078    }
079
080    public int sizeIndex() {
081      return sizeIndex;
082    }
083
084    public int getItemAllocationSize() {
085      return itemAllocationSize;
086    }
087
088    public boolean hasFreeSpace() {
089      return freeCount > 0;
090    }
091
092    public boolean isCompletelyFree() {
093      return usedCount == 0;
094    }
095
096    public int freeCount() {
097      return freeCount;
098    }
099
100    public int usedCount() {
101      return usedCount;
102    }
103
104    public int getFreeBytes() {
105      return freeCount * itemAllocationSize;
106    }
107
108    public int getUsedBytes() {
109      return usedCount * itemAllocationSize;
110    }
111
112    public long getBaseOffset() {
113      return baseOffset;
114    }
115
116    /**
117     * Allocate a block in this bucket, return the offset representing the
118     * position in physical space
119     * @return the offset in the IOEngine
120     */
121    public long allocate() {
122      assert freeCount > 0; // Else should not have been called
123      assert sizeIndex != -1;
124      ++usedCount;
125      long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
126      assert offset >= 0;
127      return offset;
128    }
129
130    public void addAllocation(long offset) throws BucketAllocatorException {
131      offset -= baseOffset;
132      if (offset < 0 || offset % itemAllocationSize != 0)
133        throw new BucketAllocatorException(
134            "Attempt to add allocation for bad offset: " + offset + " base="
135                + baseOffset + ", bucket size=" + itemAllocationSize);
136      int idx = (int) (offset / itemAllocationSize);
137      boolean matchFound = false;
138      for (int i = 0; i < freeCount; ++i) {
139        if (matchFound) freeList[i - 1] = freeList[i];
140        else if (freeList[i] == idx) matchFound = true;
141      }
142      if (!matchFound)
143        throw new BucketAllocatorException("Couldn't find match for index "
144            + idx + " in free list");
145      ++usedCount;
146      --freeCount;
147    }
148
149    private void free(long offset) {
150      offset -= baseOffset;
151      assert offset >= 0;
152      assert offset < itemCount * itemAllocationSize;
153      assert offset % itemAllocationSize == 0;
154      assert usedCount > 0;
155      assert freeCount < itemCount; // Else duplicate free
156      int item = (int) (offset / (long) itemAllocationSize);
157      assert !freeListContains(item);
158      --usedCount;
159      freeList[freeCount++] = item;
160    }
161
162    private boolean freeListContains(int blockNo) {
163      for (int i = 0; i < freeCount; ++i) {
164        if (freeList[i] == blockNo) return true;
165      }
166      return false;
167    }
168  }
169
170  final class BucketSizeInfo {
171    // Free bucket means it has space to allocate a block;
172    // Completely free bucket means it has no block.
173    private LinkedMap bucketList, freeBuckets, completelyFreeBuckets;
174    private int sizeIndex;
175
176    BucketSizeInfo(int sizeIndex) {
177      bucketList = new LinkedMap();
178      freeBuckets = new LinkedMap();
179      completelyFreeBuckets = new LinkedMap();
180      this.sizeIndex = sizeIndex;
181    }
182
183    public synchronized void instantiateBucket(Bucket b) {
184      assert b.isUninstantiated() || b.isCompletelyFree();
185      b.reconfigure(sizeIndex, bucketSizes, bucketCapacity);
186      bucketList.put(b, b);
187      freeBuckets.put(b, b);
188      completelyFreeBuckets.put(b, b);
189    }
190
191    public int sizeIndex() {
192      return sizeIndex;
193    }
194
195    /**
196     * Find a bucket to allocate a block
197     * @return the offset in the IOEngine
198     */
199    public long allocateBlock() {
200      Bucket b = null;
201      if (freeBuckets.size() > 0) {
202        // Use up an existing one first...
203        b = (Bucket) freeBuckets.lastKey();
204      }
205      if (b == null) {
206        b = grabGlobalCompletelyFreeBucket();
207        if (b != null) instantiateBucket(b);
208      }
209      if (b == null) return -1;
210      long result = b.allocate();
211      blockAllocated(b);
212      return result;
213    }
214
215    void blockAllocated(Bucket b) {
216      if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
217      if (!b.hasFreeSpace()) freeBuckets.remove(b);
218    }
219
220    public Bucket findAndRemoveCompletelyFreeBucket() {
221      Bucket b = null;
222      assert bucketList.size() > 0;
223      if (bucketList.size() == 1) {
224        // So we never get complete starvation of a bucket for a size
225        return null;
226      }
227
228      if (completelyFreeBuckets.size() > 0) {
229        b = (Bucket) completelyFreeBuckets.firstKey();
230        removeBucket(b);
231      }
232      return b;
233    }
234
235    private synchronized void removeBucket(Bucket b) {
236      assert b.isCompletelyFree();
237      bucketList.remove(b);
238      freeBuckets.remove(b);
239      completelyFreeBuckets.remove(b);
240    }
241
242    public void freeBlock(Bucket b, long offset) {
243      assert bucketList.containsKey(b);
244      // else we shouldn't have anything to free...
245      assert (!completelyFreeBuckets.containsKey(b));
246      b.free(offset);
247      if (!freeBuckets.containsKey(b)) freeBuckets.put(b, b);
248      if (b.isCompletelyFree()) completelyFreeBuckets.put(b, b);
249    }
250
251    public synchronized IndexStatistics statistics() {
252      long free = 0, used = 0;
253      for (Object obj : bucketList.keySet()) {
254        Bucket b = (Bucket) obj;
255        free += b.freeCount();
256        used += b.usedCount();
257      }
258      return new IndexStatistics(free, used, bucketSizes[sizeIndex]);
259    }
260
261    @Override
262    public String toString() {
263      return MoreObjects.toStringHelper(this.getClass())
264        .add("sizeIndex", sizeIndex)
265        .add("bucketSize", bucketSizes[sizeIndex])
266        .toString();
267    }
268  }
269
270  // Default block size in hbase is 64K, so we choose more sizes near 64K, you'd better
271  // reset it according to your cluster's block size distribution
272  // The real block size in hfile maybe a little larger than the size we configured ,
273  // so we need add extra 1024 bytes for fit.
274  // TODO Support the view of block size distribution statistics
275  private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
276      16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
277      56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
278      192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
279      512 * 1024 + 1024 };
280
281  /**
282   * Round up the given block size to bucket size, and get the corresponding
283   * BucketSizeInfo
284   */
285  public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
286    for (int i = 0; i < bucketSizes.length; ++i)
287      if (blockSize <= bucketSizes[i])
288        return bucketSizeInfos[i];
289    return null;
290  }
291
292  /**
293   * So, what is the minimum amount of items we'll tolerate in a single bucket?
294   */
295  static public final int FEWEST_ITEMS_IN_BUCKET = 4;
296
297  private final int[] bucketSizes;
298  private final int bigItemSize;
299  // The capacity size for each bucket
300  private final long bucketCapacity;
301  private Bucket[] buckets;
302  private BucketSizeInfo[] bucketSizeInfos;
303  private final long totalSize;
304  private transient long usedSize = 0;
305
306  BucketAllocator(long availableSpace, int[] bucketSizes)
307      throws BucketAllocatorException {
308    this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes;
309    Arrays.sort(this.bucketSizes);
310    this.bigItemSize = Ints.max(this.bucketSizes);
311    this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * (long) bigItemSize;
312    buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
313    if (buckets.length < this.bucketSizes.length)
314      throw new BucketAllocatorException("Bucket allocator size too small (" + buckets.length +
315        "); must have room for at least " + this.bucketSizes.length + " buckets");
316    bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length];
317    for (int i = 0; i < this.bucketSizes.length; ++i) {
318      bucketSizeInfos[i] = new BucketSizeInfo(i);
319    }
320    for (int i = 0; i < buckets.length; ++i) {
321      buckets[i] = new Bucket(bucketCapacity * i);
322      bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1]
323          .instantiateBucket(buckets[i]);
324    }
325    this.totalSize = ((long) buckets.length) * bucketCapacity;
326    if (LOG.isInfoEnabled()) {
327      LOG.info("Cache totalSize=" + this.totalSize + ", buckets=" + this.buckets.length +
328        ", bucket capacity=" + this.bucketCapacity +
329        "=(" + FEWEST_ITEMS_IN_BUCKET + "*" + this.bigItemSize + ")=" +
330        "(FEWEST_ITEMS_IN_BUCKET*(largest configured bucketcache size))");
331    }
332  }
333
334  /**
335   * Rebuild the allocator's data structures from a persisted map.
336   * @param availableSpace capacity of cache
337   * @param map A map stores the block key and BucketEntry(block's meta data
338   *          like offset, length)
339   * @param realCacheSize cached data size statistics for bucket cache
340   * @throws BucketAllocatorException
341   */
342  BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
343      LongAdder realCacheSize) throws BucketAllocatorException {
344    this(availableSpace, bucketSizes);
345
346    // each bucket has an offset, sizeindex. probably the buckets are too big
347    // in our default state. so what we do is reconfigure them according to what
348    // we've found. we can only reconfigure each bucket once; if more than once,
349    // we know there's a bug, so we just log the info, throw, and start again...
350    boolean[] reconfigured = new boolean[buckets.length];
351    int sizeNotMatchedCount = 0;
352    int insufficientCapacityCount = 0;
353    Iterator<Map.Entry<BlockCacheKey, BucketEntry>> iterator = map.entrySet().iterator();
354    while (iterator.hasNext()) {
355      Map.Entry<BlockCacheKey, BucketEntry> entry = iterator.next();
356      long foundOffset = entry.getValue().offset();
357      int foundLen = entry.getValue().getLength();
358      int bucketSizeIndex = -1;
359      for (int i = 0; i < this.bucketSizes.length; ++i) {
360        if (foundLen <= this.bucketSizes[i]) {
361          bucketSizeIndex = i;
362          break;
363        }
364      }
365      if (bucketSizeIndex == -1) {
366        sizeNotMatchedCount++;
367        iterator.remove();
368        continue;
369      }
370      int bucketNo = (int) (foundOffset / bucketCapacity);
371      if (bucketNo < 0 || bucketNo >= buckets.length) {
372        insufficientCapacityCount++;
373        iterator.remove();
374        continue;
375      }
376      Bucket b = buckets[bucketNo];
377      if (reconfigured[bucketNo]) {
378        if (b.sizeIndex() != bucketSizeIndex) {
379          throw new BucketAllocatorException("Inconsistent allocation in bucket map;");
380        }
381      } else {
382        if (!b.isCompletelyFree()) {
383          throw new BucketAllocatorException(
384              "Reconfiguring bucket " + bucketNo + " but it's already allocated; corrupt data");
385        }
386        // Need to remove the bucket from whichever list it's currently in at
387        // the moment...
388        BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
389        BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
390        oldbsi.removeBucket(b);
391        bsi.instantiateBucket(b);
392        reconfigured[bucketNo] = true;
393      }
394      realCacheSize.add(foundLen);
395      buckets[bucketNo].addAllocation(foundOffset);
396      usedSize += buckets[bucketNo].getItemAllocationSize();
397      bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
398    }
399
400    if (sizeNotMatchedCount > 0) {
401      LOG.warn("There are " + sizeNotMatchedCount + " blocks which can't be rebuilt because " +
402        "there is no matching bucket size for these blocks");
403    }
404    if (insufficientCapacityCount > 0) {
405      LOG.warn("There are " + insufficientCapacityCount + " blocks which can't be rebuilt - "
406        + "did you shrink the cache?");
407    }
408  }
409
410  @Override
411  public String toString() {
412    StringBuilder sb = new StringBuilder(1024);
413    for (int i = 0; i < buckets.length; ++i) {
414      Bucket b = buckets[i];
415      if (i > 0) sb.append(", ");
416      sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize());
417      sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount());
418    }
419    return sb.toString();
420  }
421
422  public long getUsedSize() {
423    return this.usedSize;
424  }
425
426  public long getFreeSize() {
427    return this.totalSize - getUsedSize();
428  }
429
430  public long getTotalSize() {
431    return this.totalSize;
432  }
433
434  /**
435   * Allocate a block with specified size. Return the offset
436   * @param blockSize size of block
437   * @throws BucketAllocatorException
438   * @throws CacheFullException
439   * @return the offset in the IOEngine
440   */
441  public synchronized long allocateBlock(int blockSize) throws CacheFullException,
442      BucketAllocatorException {
443    assert blockSize > 0;
444    BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
445    if (bsi == null) {
446      throw new BucketAllocatorException("Allocation too big size=" + blockSize +
447        "; adjust BucketCache sizes " + BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY +
448        " to accomodate if size seems reasonable and you want it cached.");
449    }
450    long offset = bsi.allocateBlock();
451
452    // Ask caller to free up space and try again!
453    if (offset < 0)
454      throw new CacheFullException(blockSize, bsi.sizeIndex());
455    usedSize += bucketSizes[bsi.sizeIndex()];
456    return offset;
457  }
458
459  private Bucket grabGlobalCompletelyFreeBucket() {
460    for (BucketSizeInfo bsi : bucketSizeInfos) {
461      Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
462      if (b != null) return b;
463    }
464    return null;
465  }
466
467  /**
468   * Free a block with the offset
469   * @param offset block's offset
470   * @return size freed
471   */
472  public synchronized int freeBlock(long offset) {
473    int bucketNo = (int) (offset / bucketCapacity);
474    assert bucketNo >= 0 && bucketNo < buckets.length;
475    Bucket targetBucket = buckets[bucketNo];
476    bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
477    usedSize -= targetBucket.getItemAllocationSize();
478    return targetBucket.getItemAllocationSize();
479  }
480
481  public int sizeIndexOfAllocation(long offset) {
482    int bucketNo = (int) (offset / bucketCapacity);
483    assert bucketNo >= 0 && bucketNo < buckets.length;
484    Bucket targetBucket = buckets[bucketNo];
485    return targetBucket.sizeIndex();
486  }
487
488  public int sizeOfAllocation(long offset) {
489    int bucketNo = (int) (offset / bucketCapacity);
490    assert bucketNo >= 0 && bucketNo < buckets.length;
491    Bucket targetBucket = buckets[bucketNo];
492    return targetBucket.getItemAllocationSize();
493  }
494
495  static class IndexStatistics {
496    private long freeCount, usedCount, itemSize, totalCount;
497
498    public long freeCount() {
499      return freeCount;
500    }
501
502    public long usedCount() {
503      return usedCount;
504    }
505
506    public long totalCount() {
507      return totalCount;
508    }
509
510    public long freeBytes() {
511      return freeCount * itemSize;
512    }
513
514    public long usedBytes() {
515      return usedCount * itemSize;
516    }
517
518    public long totalBytes() {
519      return totalCount * itemSize;
520    }
521
522    public long itemSize() {
523      return itemSize;
524    }
525
526    public IndexStatistics(long free, long used, long itemSize) {
527      setTo(free, used, itemSize);
528    }
529
530    public IndexStatistics() {
531      setTo(-1, -1, 0);
532    }
533
534    public void setTo(long free, long used, long itemSize) {
535      this.itemSize = itemSize;
536      this.freeCount = free;
537      this.usedCount = used;
538      this.totalCount = free + used;
539    }
540  }
541
542  public Bucket [] getBuckets() {
543    return this.buckets;
544  }
545
546  void logStatistics() {
547    IndexStatistics total = new IndexStatistics();
548    IndexStatistics[] stats = getIndexStatistics(total);
549    LOG.info("Bucket allocator statistics follow:\n");
550    LOG.info("  Free bytes=" + total.freeBytes() + "+; used bytes="
551        + total.usedBytes() + "; total bytes=" + total.totalBytes());
552    for (IndexStatistics s : stats) {
553      LOG.info("  Object size " + s.itemSize() + " used=" + s.usedCount()
554          + "; free=" + s.freeCount() + "; total=" + s.totalCount());
555    }
556  }
557
558  IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
559    IndexStatistics[] stats = getIndexStatistics();
560    long totalfree = 0, totalused = 0;
561    for (IndexStatistics stat : stats) {
562      totalfree += stat.freeBytes();
563      totalused += stat.usedBytes();
564    }
565    grandTotal.setTo(totalfree, totalused, 1);
566    return stats;
567  }
568
569  IndexStatistics[] getIndexStatistics() {
570    IndexStatistics[] stats = new IndexStatistics[bucketSizes.length];
571    for (int i = 0; i < stats.length; ++i)
572      stats[i] = bucketSizeInfos[i].statistics();
573    return stats;
574  }
575
576  public long freeBlock(long freeList[]) {
577    long sz = 0;
578    for (int i = 0; i < freeList.length; ++i)
579      sz += freeBlock(freeList[i]);
580    return sz;
581  }
582
583  public int getBucketIndex(long offset) {
584    return (int) (offset / bucketCapacity);
585  }
586
587  /**
588   * Returns a set of indices of the buckets that are least filled
589   * excluding the offsets, we also the fully free buckets for the
590   * BucketSizes where everything is empty and they only have one
591   * completely free bucket as a reserved
592   *
593   * @param excludedBuckets the buckets that need to be excluded due to
594   *                        currently being in used
595   * @param bucketCount     max Number of buckets to return
596   * @return set of bucket indices which could be used for eviction
597   */
598  public Set<Integer> getLeastFilledBuckets(Set<Integer> excludedBuckets,
599                                            int bucketCount) {
600    Queue<Integer> queue = MinMaxPriorityQueue.<Integer>orderedBy(
601        new Comparator<Integer>() {
602          @Override
603          public int compare(Integer left, Integer right) {
604            // We will always get instantiated buckets
605            return Float.compare(
606                ((float) buckets[left].usedCount) / buckets[left].itemCount,
607                ((float) buckets[right].usedCount) / buckets[right].itemCount);
608          }
609        }).maximumSize(bucketCount).create();
610
611    for (int i = 0; i < buckets.length; i ++ ) {
612      if (!excludedBuckets.contains(i) && !buckets[i].isUninstantiated() &&
613          // Avoid the buckets that are the only buckets for a sizeIndex
614          bucketSizeInfos[buckets[i].sizeIndex()].bucketList.size() != 1) {
615        queue.add(i);
616      }
617    }
618
619    Set<Integer> result = new HashSet<>(bucketCount);
620    result.addAll(queue);
621
622    return result;
623  }
624}