001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package org.apache.hadoop.hbase.io.hfile.bucket;
022
023import java.util.Arrays;
024import java.util.Comparator;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.Map;
028import java.util.Queue;
029import java.util.Set;
030import java.util.concurrent.atomic.LongAdder;
031
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
036import org.apache.hadoop.hbase.io.hfile.CacheConfig;
037import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
038import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
039
040import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
041import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
042import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue;
043import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
044import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.LinkedMap;
045
046/**
047 * This class is used to allocate a block with specified size and free the block
048 * when evicting. It manages an array of buckets, each bucket is associated with
049 * a size and caches elements up to this size. For a completely empty bucket, this
050 * size could be re-specified dynamically.
051 *
052 * This class is not thread safe.
053 */
054@InterfaceAudience.Private
055@JsonIgnoreProperties({"indexStatistics", "freeSize", "usedSize"})
056public final class BucketAllocator {
057  private static final Logger LOG = LoggerFactory.getLogger(BucketAllocator.class);
058
059  @JsonIgnoreProperties({"completelyFree", "uninstantiated"})
060  public final static class Bucket {
061    private long baseOffset;
062    private int itemAllocationSize, sizeIndex;
063    private int itemCount;
064    private int freeList[];
065    private int freeCount, usedCount;
066
067    public Bucket(long offset) {
068      baseOffset = offset;
069      sizeIndex = -1;
070    }
071
072    void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) {
073      Preconditions.checkElementIndex(sizeIndex, bucketSizes.length);
074      this.sizeIndex = sizeIndex;
075      itemAllocationSize = bucketSizes[sizeIndex];
076      itemCount = (int) (bucketCapacity / (long) itemAllocationSize);
077      freeCount = itemCount;
078      usedCount = 0;
079      freeList = new int[itemCount];
080      for (int i = 0; i < freeCount; ++i)
081        freeList[i] = i;
082    }
083
084    public boolean isUninstantiated() {
085      return sizeIndex == -1;
086    }
087
088    public int sizeIndex() {
089      return sizeIndex;
090    }
091
092    public int getItemAllocationSize() {
093      return itemAllocationSize;
094    }
095
096    public boolean hasFreeSpace() {
097      return freeCount > 0;
098    }
099
100    public boolean isCompletelyFree() {
101      return usedCount == 0;
102    }
103
104    public int freeCount() {
105      return freeCount;
106    }
107
108    public int usedCount() {
109      return usedCount;
110    }
111
112    public int getFreeBytes() {
113      return freeCount * itemAllocationSize;
114    }
115
116    public int getUsedBytes() {
117      return usedCount * itemAllocationSize;
118    }
119
120    public long getBaseOffset() {
121      return baseOffset;
122    }
123
124    /**
125     * Allocate a block in this bucket, return the offset representing the
126     * position in physical space
127     * @return the offset in the IOEngine
128     */
129    public long allocate() {
130      assert freeCount > 0; // Else should not have been called
131      assert sizeIndex != -1;
132      ++usedCount;
133      long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
134      assert offset >= 0;
135      return offset;
136    }
137
138    public void addAllocation(long offset) throws BucketAllocatorException {
139      offset -= baseOffset;
140      if (offset < 0 || offset % itemAllocationSize != 0)
141        throw new BucketAllocatorException(
142            "Attempt to add allocation for bad offset: " + offset + " base="
143                + baseOffset + ", bucket size=" + itemAllocationSize);
144      int idx = (int) (offset / itemAllocationSize);
145      boolean matchFound = false;
146      for (int i = 0; i < freeCount; ++i) {
147        if (matchFound) freeList[i - 1] = freeList[i];
148        else if (freeList[i] == idx) matchFound = true;
149      }
150      if (!matchFound)
151        throw new BucketAllocatorException("Couldn't find match for index "
152            + idx + " in free list");
153      ++usedCount;
154      --freeCount;
155    }
156
157    private void free(long offset) {
158      offset -= baseOffset;
159      assert offset >= 0;
160      assert offset < itemCount * itemAllocationSize;
161      assert offset % itemAllocationSize == 0;
162      assert usedCount > 0;
163      assert freeCount < itemCount; // Else duplicate free
164      int item = (int) (offset / (long) itemAllocationSize);
165      assert !freeListContains(item);
166      --usedCount;
167      freeList[freeCount++] = item;
168    }
169
170    private boolean freeListContains(int blockNo) {
171      for (int i = 0; i < freeCount; ++i) {
172        if (freeList[i] == blockNo) return true;
173      }
174      return false;
175    }
176  }
177
178  final class BucketSizeInfo {
179    // Free bucket means it has space to allocate a block;
180    // Completely free bucket means it has no block.
181    private LinkedMap bucketList, freeBuckets, completelyFreeBuckets;
182    private int sizeIndex;
183
184    BucketSizeInfo(int sizeIndex) {
185      bucketList = new LinkedMap();
186      freeBuckets = new LinkedMap();
187      completelyFreeBuckets = new LinkedMap();
188      this.sizeIndex = sizeIndex;
189    }
190
191    public synchronized void instantiateBucket(Bucket b) {
192      assert b.isUninstantiated() || b.isCompletelyFree();
193      b.reconfigure(sizeIndex, bucketSizes, bucketCapacity);
194      bucketList.put(b, b);
195      freeBuckets.put(b, b);
196      completelyFreeBuckets.put(b, b);
197    }
198
199    public int sizeIndex() {
200      return sizeIndex;
201    }
202
203    /**
204     * Find a bucket to allocate a block
205     * @return the offset in the IOEngine
206     */
207    public long allocateBlock() {
208      Bucket b = null;
209      if (freeBuckets.size() > 0) {
210        // Use up an existing one first...
211        b = (Bucket) freeBuckets.lastKey();
212      }
213      if (b == null) {
214        b = grabGlobalCompletelyFreeBucket();
215        if (b != null) instantiateBucket(b);
216      }
217      if (b == null) return -1;
218      long result = b.allocate();
219      blockAllocated(b);
220      return result;
221    }
222
223    void blockAllocated(Bucket b) {
224      if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
225      if (!b.hasFreeSpace()) freeBuckets.remove(b);
226    }
227
228    public Bucket findAndRemoveCompletelyFreeBucket() {
229      Bucket b = null;
230      assert bucketList.size() > 0;
231      if (bucketList.size() == 1) {
232        // So we never get complete starvation of a bucket for a size
233        return null;
234      }
235
236      if (completelyFreeBuckets.size() > 0) {
237        b = (Bucket) completelyFreeBuckets.firstKey();
238        removeBucket(b);
239      }
240      return b;
241    }
242
243    private synchronized void removeBucket(Bucket b) {
244      assert b.isCompletelyFree();
245      bucketList.remove(b);
246      freeBuckets.remove(b);
247      completelyFreeBuckets.remove(b);
248    }
249
250    public void freeBlock(Bucket b, long offset) {
251      assert bucketList.containsKey(b);
252      // else we shouldn't have anything to free...
253      assert (!completelyFreeBuckets.containsKey(b));
254      b.free(offset);
255      if (!freeBuckets.containsKey(b)) freeBuckets.put(b, b);
256      if (b.isCompletelyFree()) completelyFreeBuckets.put(b, b);
257    }
258
259    public synchronized IndexStatistics statistics() {
260      long free = 0, used = 0;
261      for (Object obj : bucketList.keySet()) {
262        Bucket b = (Bucket) obj;
263        free += b.freeCount();
264        used += b.usedCount();
265      }
266      return new IndexStatistics(free, used, bucketSizes[sizeIndex]);
267    }
268
269    @Override
270    public String toString() {
271      return MoreObjects.toStringHelper(this.getClass())
272        .add("sizeIndex", sizeIndex)
273        .add("bucketSize", bucketSizes[sizeIndex])
274        .toString();
275    }
276  }
277
278  // Default block size in hbase is 64K, so we choose more sizes near 64K, you'd better
279  // reset it according to your cluster's block size distribution
280  // TODO Support the view of block size distribution statistics
281  // TODO: Why we add the extra 1024 bytes? Slop?
282  private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
283      16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
284      56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
285      192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
286      512 * 1024 + 1024 };
287
288  /**
289   * Round up the given block size to bucket size, and get the corresponding
290   * BucketSizeInfo
291   */
292  public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
293    for (int i = 0; i < bucketSizes.length; ++i)
294      if (blockSize <= bucketSizes[i])
295        return bucketSizeInfos[i];
296    return null;
297  }
298
299  /**
300   * So, what is the minimum amount of items we'll tolerate in a single bucket?
301   */
302  static public final int FEWEST_ITEMS_IN_BUCKET = 4;
303
304  private final int[] bucketSizes;
305  private final int bigItemSize;
306  // The capacity size for each bucket
307  private final long bucketCapacity;
308  private Bucket[] buckets;
309  private BucketSizeInfo[] bucketSizeInfos;
310  private final long totalSize;
311  private long usedSize = 0;
312
313  BucketAllocator(long availableSpace, int[] bucketSizes)
314      throws BucketAllocatorException {
315    this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes;
316    Arrays.sort(this.bucketSizes);
317    this.bigItemSize = Ints.max(this.bucketSizes);
318    this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * (long) bigItemSize;
319    buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
320    if (buckets.length < this.bucketSizes.length)
321      throw new BucketAllocatorException("Bucket allocator size too small (" + buckets.length +
322        "); must have room for at least " + this.bucketSizes.length + " buckets");
323    bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length];
324    for (int i = 0; i < this.bucketSizes.length; ++i) {
325      bucketSizeInfos[i] = new BucketSizeInfo(i);
326    }
327    for (int i = 0; i < buckets.length; ++i) {
328      buckets[i] = new Bucket(bucketCapacity * i);
329      bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1]
330          .instantiateBucket(buckets[i]);
331    }
332    this.totalSize = ((long) buckets.length) * bucketCapacity;
333    if (LOG.isInfoEnabled()) {
334      LOG.info("Cache totalSize=" + this.totalSize + ", buckets=" + this.buckets.length +
335        ", bucket capacity=" + this.bucketCapacity +
336        "=(" + FEWEST_ITEMS_IN_BUCKET + "*" + this.bigItemSize + ")=" +
337        "(FEWEST_ITEMS_IN_BUCKET*(largest configured bucketcache size))");
338    }
339  }
340
341  /**
342   * Rebuild the allocator's data structures from a persisted map.
343   * @param availableSpace capacity of cache
344   * @param map A map stores the block key and BucketEntry(block's meta data
345   *          like offset, length)
346   * @param realCacheSize cached data size statistics for bucket cache
347   * @throws BucketAllocatorException
348   */
349  BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
350      LongAdder realCacheSize) throws BucketAllocatorException {
351    this(availableSpace, bucketSizes);
352
353    // each bucket has an offset, sizeindex. probably the buckets are too big
354    // in our default state. so what we do is reconfigure them according to what
355    // we've found. we can only reconfigure each bucket once; if more than once,
356    // we know there's a bug, so we just log the info, throw, and start again...
357    boolean[] reconfigured = new boolean[buckets.length];
358    int sizeNotMatchedCount = 0;
359    int insufficientCapacityCount = 0;
360    Iterator<Map.Entry<BlockCacheKey, BucketEntry>> iterator = map.entrySet().iterator();
361    while (iterator.hasNext()) {
362      Map.Entry<BlockCacheKey, BucketEntry> entry = iterator.next();
363      long foundOffset = entry.getValue().offset();
364      int foundLen = entry.getValue().getLength();
365      int bucketSizeIndex = -1;
366      for (int i = 0; i < this.bucketSizes.length; ++i) {
367        if (foundLen <= this.bucketSizes[i]) {
368          bucketSizeIndex = i;
369          break;
370        }
371      }
372      if (bucketSizeIndex == -1) {
373        sizeNotMatchedCount++;
374        iterator.remove();
375        continue;
376      }
377      int bucketNo = (int) (foundOffset / bucketCapacity);
378      if (bucketNo < 0 || bucketNo >= buckets.length) {
379        insufficientCapacityCount++;
380        iterator.remove();
381        continue;
382      }
383      Bucket b = buckets[bucketNo];
384      if (reconfigured[bucketNo]) {
385        if (b.sizeIndex() != bucketSizeIndex) {
386          throw new BucketAllocatorException("Inconsistent allocation in bucket map;");
387        }
388      } else {
389        if (!b.isCompletelyFree()) {
390          throw new BucketAllocatorException(
391              "Reconfiguring bucket " + bucketNo + " but it's already allocated; corrupt data");
392        }
393        // Need to remove the bucket from whichever list it's currently in at
394        // the moment...
395        BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
396        BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
397        oldbsi.removeBucket(b);
398        bsi.instantiateBucket(b);
399        reconfigured[bucketNo] = true;
400      }
401      realCacheSize.add(foundLen);
402      buckets[bucketNo].addAllocation(foundOffset);
403      usedSize += buckets[bucketNo].getItemAllocationSize();
404      bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
405    }
406
407    if (sizeNotMatchedCount > 0) {
408      LOG.warn("There are " + sizeNotMatchedCount + " blocks which can't be rebuilt because " +
409        "there is no matching bucket size for these blocks");
410    }
411    if (insufficientCapacityCount > 0) {
412      LOG.warn("There are " + insufficientCapacityCount + " blocks which can't be rebuilt - "
413        + "did you shrink the cache?");
414    }
415  }
416
417  @Override
418  public String toString() {
419    StringBuilder sb = new StringBuilder(1024);
420    for (int i = 0; i < buckets.length; ++i) {
421      Bucket b = buckets[i];
422      if (i > 0) sb.append(", ");
423      sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize());
424      sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount());
425    }
426    return sb.toString();
427  }
428
429  public long getUsedSize() {
430    return this.usedSize;
431  }
432
433  public long getFreeSize() {
434    return this.totalSize - getUsedSize();
435  }
436
437  public long getTotalSize() {
438    return this.totalSize;
439  }
440
441  /**
442   * Allocate a block with specified size. Return the offset
443   * @param blockSize size of block
444   * @throws BucketAllocatorException
445   * @throws CacheFullException
446   * @return the offset in the IOEngine
447   */
448  public synchronized long allocateBlock(int blockSize) throws CacheFullException,
449      BucketAllocatorException {
450    assert blockSize > 0;
451    BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
452    if (bsi == null) {
453      throw new BucketAllocatorException("Allocation too big size=" + blockSize +
454        "; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY +
455        " to accomodate if size seems reasonable and you want it cached.");
456    }
457    long offset = bsi.allocateBlock();
458
459    // Ask caller to free up space and try again!
460    if (offset < 0)
461      throw new CacheFullException(blockSize, bsi.sizeIndex());
462    usedSize += bucketSizes[bsi.sizeIndex()];
463    return offset;
464  }
465
466  private Bucket grabGlobalCompletelyFreeBucket() {
467    for (BucketSizeInfo bsi : bucketSizeInfos) {
468      Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
469      if (b != null) return b;
470    }
471    return null;
472  }
473
474  /**
475   * Free a block with the offset
476   * @param offset block's offset
477   * @return size freed
478   */
479  public synchronized int freeBlock(long offset) {
480    int bucketNo = (int) (offset / bucketCapacity);
481    assert bucketNo >= 0 && bucketNo < buckets.length;
482    Bucket targetBucket = buckets[bucketNo];
483    bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
484    usedSize -= targetBucket.getItemAllocationSize();
485    return targetBucket.getItemAllocationSize();
486  }
487
488  public int sizeIndexOfAllocation(long offset) {
489    int bucketNo = (int) (offset / bucketCapacity);
490    assert bucketNo >= 0 && bucketNo < buckets.length;
491    Bucket targetBucket = buckets[bucketNo];
492    return targetBucket.sizeIndex();
493  }
494
495  public int sizeOfAllocation(long offset) {
496    int bucketNo = (int) (offset / bucketCapacity);
497    assert bucketNo >= 0 && bucketNo < buckets.length;
498    Bucket targetBucket = buckets[bucketNo];
499    return targetBucket.getItemAllocationSize();
500  }
501
502  static class IndexStatistics {
503    private long freeCount, usedCount, itemSize, totalCount;
504
505    public long freeCount() {
506      return freeCount;
507    }
508
509    public long usedCount() {
510      return usedCount;
511    }
512
513    public long totalCount() {
514      return totalCount;
515    }
516
517    public long freeBytes() {
518      return freeCount * itemSize;
519    }
520
521    public long usedBytes() {
522      return usedCount * itemSize;
523    }
524
525    public long totalBytes() {
526      return totalCount * itemSize;
527    }
528
529    public long itemSize() {
530      return itemSize;
531    }
532
533    public IndexStatistics(long free, long used, long itemSize) {
534      setTo(free, used, itemSize);
535    }
536
537    public IndexStatistics() {
538      setTo(-1, -1, 0);
539    }
540
541    public void setTo(long free, long used, long itemSize) {
542      this.itemSize = itemSize;
543      this.freeCount = free;
544      this.usedCount = used;
545      this.totalCount = free + used;
546    }
547  }
548
549  public Bucket [] getBuckets() {
550    return this.buckets;
551  }
552
553  void logStatistics() {
554    IndexStatistics total = new IndexStatistics();
555    IndexStatistics[] stats = getIndexStatistics(total);
556    LOG.info("Bucket allocator statistics follow:\n");
557    LOG.info("  Free bytes=" + total.freeBytes() + "+; used bytes="
558        + total.usedBytes() + "; total bytes=" + total.totalBytes());
559    for (IndexStatistics s : stats) {
560      LOG.info("  Object size " + s.itemSize() + " used=" + s.usedCount()
561          + "; free=" + s.freeCount() + "; total=" + s.totalCount());
562    }
563  }
564
565  IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
566    IndexStatistics[] stats = getIndexStatistics();
567    long totalfree = 0, totalused = 0;
568    for (IndexStatistics stat : stats) {
569      totalfree += stat.freeBytes();
570      totalused += stat.usedBytes();
571    }
572    grandTotal.setTo(totalfree, totalused, 1);
573    return stats;
574  }
575
576  IndexStatistics[] getIndexStatistics() {
577    IndexStatistics[] stats = new IndexStatistics[bucketSizes.length];
578    for (int i = 0; i < stats.length; ++i)
579      stats[i] = bucketSizeInfos[i].statistics();
580    return stats;
581  }
582
583  public long freeBlock(long freeList[]) {
584    long sz = 0;
585    for (int i = 0; i < freeList.length; ++i)
586      sz += freeBlock(freeList[i]);
587    return sz;
588  }
589
590  public int getBucketIndex(long offset) {
591    return (int) (offset / bucketCapacity);
592  }
593
594  /**
595   * Returns a set of indices of the buckets that are least filled
596   * excluding the offsets, we also the fully free buckets for the
597   * BucketSizes where everything is empty and they only have one
598   * completely free bucket as a reserved
599   *
600   * @param excludedBuckets the buckets that need to be excluded due to
601   *                        currently being in used
602   * @param bucketCount     max Number of buckets to return
603   * @return set of bucket indices which could be used for eviction
604   */
605  public Set<Integer> getLeastFilledBuckets(Set<Integer> excludedBuckets,
606                                            int bucketCount) {
607    Queue<Integer> queue = MinMaxPriorityQueue.<Integer>orderedBy(
608        new Comparator<Integer>() {
609          @Override
610          public int compare(Integer left, Integer right) {
611            // We will always get instantiated buckets
612            return Float.compare(
613                ((float) buckets[left].usedCount) / buckets[left].itemCount,
614                ((float) buckets[right].usedCount) / buckets[right].itemCount);
615          }
616        }).maximumSize(bucketCount).create();
617
618    for (int i = 0; i < buckets.length; i ++ ) {
619      if (!excludedBuckets.contains(i) && !buckets[i].isUninstantiated() &&
620          // Avoid the buckets that are the only buckets for a sizeIndex
621          bucketSizeInfos[buckets[i].sizeIndex()].bucketList.size() != 1) {
622        queue.add(i);
623      }
624    }
625
626    Set<Integer> result = new HashSet<>(bucketCount);
627    result.addAll(queue);
628
629    return result;
630  }
631}