001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.io.hfile.bucket;
020
021import java.util.Arrays;
022import java.util.Comparator;
023import java.util.HashSet;
024import java.util.Iterator;
025import java.util.Map;
026import java.util.Queue;
027import java.util.Set;
028import java.util.concurrent.atomic.LongAdder;
029import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
030import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
031import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
037import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
038import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue;
039import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
040import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.LinkedMap;
041
042/**
043 * This class is used to allocate a block with specified size and free the block when evicting. It
044 * manages an array of buckets, each bucket is associated with a size and caches elements up to this
045 * size. For a completely empty bucket, this size could be re-specified dynamically.
046 * <p/>
047 * This class is not thread safe.
048 */
049@InterfaceAudience.Private
050public final class BucketAllocator {
051  private static final Logger LOG = LoggerFactory.getLogger(BucketAllocator.class);
052
053  public final static class Bucket {
054    private long baseOffset;
055    private int itemAllocationSize, sizeIndex;
056    private int itemCount;
057    private int freeList[];
058    private int freeCount, usedCount;
059
060    public Bucket(long offset) {
061      baseOffset = offset;
062      sizeIndex = -1;
063    }
064
065    void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) {
066      Preconditions.checkElementIndex(sizeIndex, bucketSizes.length);
067      this.sizeIndex = sizeIndex;
068      itemAllocationSize = bucketSizes[sizeIndex];
069      itemCount = (int) (bucketCapacity / (long) itemAllocationSize);
070      freeCount = itemCount;
071      usedCount = 0;
072      freeList = new int[itemCount];
073      for (int i = 0; i < freeCount; ++i)
074        freeList[i] = i;
075    }
076
077    public boolean isUninstantiated() {
078      return sizeIndex == -1;
079    }
080
081    public int sizeIndex() {
082      return sizeIndex;
083    }
084
085    public int getItemAllocationSize() {
086      return itemAllocationSize;
087    }
088
089    public boolean hasFreeSpace() {
090      return freeCount > 0;
091    }
092
093    public boolean isCompletelyFree() {
094      return usedCount == 0;
095    }
096
097    public int freeCount() {
098      return freeCount;
099    }
100
101    public int usedCount() {
102      return usedCount;
103    }
104
105    public int getFreeBytes() {
106      return freeCount * itemAllocationSize;
107    }
108
109    public int getUsedBytes() {
110      return usedCount * itemAllocationSize;
111    }
112
113    public long getBaseOffset() {
114      return baseOffset;
115    }
116
117    /**
118     * Allocate a block in this bucket, return the offset representing the
119     * position in physical space
120     * @return the offset in the IOEngine
121     */
122    public long allocate() {
123      assert freeCount > 0; // Else should not have been called
124      assert sizeIndex != -1;
125      ++usedCount;
126      long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
127      assert offset >= 0;
128      return offset;
129    }
130
131    public void addAllocation(long offset) throws BucketAllocatorException {
132      offset -= baseOffset;
133      if (offset < 0 || offset % itemAllocationSize != 0)
134        throw new BucketAllocatorException(
135            "Attempt to add allocation for bad offset: " + offset + " base="
136                + baseOffset + ", bucket size=" + itemAllocationSize);
137      int idx = (int) (offset / itemAllocationSize);
138      boolean matchFound = false;
139      for (int i = 0; i < freeCount; ++i) {
140        if (matchFound) freeList[i - 1] = freeList[i];
141        else if (freeList[i] == idx) matchFound = true;
142      }
143      if (!matchFound)
144        throw new BucketAllocatorException("Couldn't find match for index "
145            + idx + " in free list");
146      ++usedCount;
147      --freeCount;
148    }
149
150    private void free(long offset) {
151      offset -= baseOffset;
152      assert offset >= 0;
153      assert offset < itemCount * itemAllocationSize;
154      assert offset % itemAllocationSize == 0;
155      assert usedCount > 0;
156      assert freeCount < itemCount; // Else duplicate free
157      int item = (int) (offset / (long) itemAllocationSize);
158      assert !freeListContains(item);
159      --usedCount;
160      freeList[freeCount++] = item;
161    }
162
163    private boolean freeListContains(int blockNo) {
164      for (int i = 0; i < freeCount; ++i) {
165        if (freeList[i] == blockNo) return true;
166      }
167      return false;
168    }
169  }
170
171  final class BucketSizeInfo {
172    // Free bucket means it has space to allocate a block;
173    // Completely free bucket means it has no block.
174    private LinkedMap bucketList, freeBuckets, completelyFreeBuckets;
175    private int sizeIndex;
176
177    BucketSizeInfo(int sizeIndex) {
178      bucketList = new LinkedMap();
179      freeBuckets = new LinkedMap();
180      completelyFreeBuckets = new LinkedMap();
181      this.sizeIndex = sizeIndex;
182    }
183
184    public synchronized void instantiateBucket(Bucket b) {
185      assert b.isUninstantiated() || b.isCompletelyFree();
186      b.reconfigure(sizeIndex, bucketSizes, bucketCapacity);
187      bucketList.put(b, b);
188      freeBuckets.put(b, b);
189      completelyFreeBuckets.put(b, b);
190    }
191
192    public int sizeIndex() {
193      return sizeIndex;
194    }
195
196    /**
197     * Find a bucket to allocate a block
198     * @return the offset in the IOEngine
199     */
200    public long allocateBlock() {
201      Bucket b = null;
202      if (freeBuckets.size() > 0) {
203        // Use up an existing one first...
204        b = (Bucket) freeBuckets.lastKey();
205      }
206      if (b == null) {
207        b = grabGlobalCompletelyFreeBucket();
208        if (b != null) instantiateBucket(b);
209      }
210      if (b == null) return -1;
211      long result = b.allocate();
212      blockAllocated(b);
213      return result;
214    }
215
216    void blockAllocated(Bucket b) {
217      if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
218      if (!b.hasFreeSpace()) freeBuckets.remove(b);
219    }
220
221    public Bucket findAndRemoveCompletelyFreeBucket() {
222      Bucket b = null;
223      assert bucketList.size() > 0;
224      if (bucketList.size() == 1) {
225        // So we never get complete starvation of a bucket for a size
226        return null;
227      }
228
229      if (completelyFreeBuckets.size() > 0) {
230        b = (Bucket) completelyFreeBuckets.firstKey();
231        removeBucket(b);
232      }
233      return b;
234    }
235
236    private synchronized void removeBucket(Bucket b) {
237      assert b.isCompletelyFree();
238      bucketList.remove(b);
239      freeBuckets.remove(b);
240      completelyFreeBuckets.remove(b);
241    }
242
243    public void freeBlock(Bucket b, long offset) {
244      assert bucketList.containsKey(b);
245      // else we shouldn't have anything to free...
246      assert (!completelyFreeBuckets.containsKey(b));
247      b.free(offset);
248      if (!freeBuckets.containsKey(b)) freeBuckets.put(b, b);
249      if (b.isCompletelyFree()) completelyFreeBuckets.put(b, b);
250    }
251
252    public synchronized IndexStatistics statistics() {
253      long free = 0, used = 0;
254      for (Object obj : bucketList.keySet()) {
255        Bucket b = (Bucket) obj;
256        free += b.freeCount();
257        used += b.usedCount();
258      }
259      return new IndexStatistics(free, used, bucketSizes[sizeIndex]);
260    }
261
262    @Override
263    public String toString() {
264      return MoreObjects.toStringHelper(this.getClass())
265        .add("sizeIndex", sizeIndex)
266        .add("bucketSize", bucketSizes[sizeIndex])
267        .toString();
268    }
269  }
270
271  // Default block size in hbase is 64K, so we choose more sizes near 64K, you'd better
272  // reset it according to your cluster's block size distribution
273  // TODO Support the view of block size distribution statistics
274  // TODO: Why we add the extra 1024 bytes? Slop?
275  private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
276      16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
277      56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
278      192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
279      512 * 1024 + 1024 };
280
281  /**
282   * Round up the given block size to bucket size, and get the corresponding
283   * BucketSizeInfo
284   */
285  public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
286    for (int i = 0; i < bucketSizes.length; ++i)
287      if (blockSize <= bucketSizes[i])
288        return bucketSizeInfos[i];
289    return null;
290  }
291
292  /**
293   * So, what is the minimum amount of items we'll tolerate in a single bucket?
294   */
295  static public final int FEWEST_ITEMS_IN_BUCKET = 4;
296
297  private final int[] bucketSizes;
298  private final int bigItemSize;
299  // The capacity size for each bucket
300  private final long bucketCapacity;
301  private Bucket[] buckets;
302  private BucketSizeInfo[] bucketSizeInfos;
303  private final long totalSize;
304  private transient long usedSize = 0;
305
306  BucketAllocator(long availableSpace, int[] bucketSizes)
307      throws BucketAllocatorException {
308    this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes;
309    Arrays.sort(this.bucketSizes);
310    this.bigItemSize = Ints.max(this.bucketSizes);
311    this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * (long) bigItemSize;
312    buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
313    if (buckets.length < this.bucketSizes.length)
314      throw new BucketAllocatorException("Bucket allocator size too small (" + buckets.length +
315        "); must have room for at least " + this.bucketSizes.length + " buckets");
316    bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length];
317    for (int i = 0; i < this.bucketSizes.length; ++i) {
318      bucketSizeInfos[i] = new BucketSizeInfo(i);
319    }
320    for (int i = 0; i < buckets.length; ++i) {
321      buckets[i] = new Bucket(bucketCapacity * i);
322      bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1]
323          .instantiateBucket(buckets[i]);
324    }
325    this.totalSize = ((long) buckets.length) * bucketCapacity;
326    if (LOG.isInfoEnabled()) {
327      LOG.info("Cache totalSize=" + this.totalSize + ", buckets=" + this.buckets.length +
328        ", bucket capacity=" + this.bucketCapacity +
329        "=(" + FEWEST_ITEMS_IN_BUCKET + "*" + this.bigItemSize + ")=" +
330        "(FEWEST_ITEMS_IN_BUCKET*(largest configured bucketcache size))");
331    }
332  }
333
334  /**
335   * Rebuild the allocator's data structures from a persisted map.
336   * @param availableSpace capacity of cache
337   * @param map A map stores the block key and BucketEntry(block's meta data
338   *          like offset, length)
339   * @param realCacheSize cached data size statistics for bucket cache
340   * @throws BucketAllocatorException
341   */
342  BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
343      LongAdder realCacheSize) throws BucketAllocatorException {
344    this(availableSpace, bucketSizes);
345
346    // each bucket has an offset, sizeindex. probably the buckets are too big
347    // in our default state. so what we do is reconfigure them according to what
348    // we've found. we can only reconfigure each bucket once; if more than once,
349    // we know there's a bug, so we just log the info, throw, and start again...
350    boolean[] reconfigured = new boolean[buckets.length];
351    int sizeNotMatchedCount = 0;
352    int insufficientCapacityCount = 0;
353    Iterator<Map.Entry<BlockCacheKey, BucketEntry>> iterator = map.entrySet().iterator();
354    while (iterator.hasNext()) {
355      Map.Entry<BlockCacheKey, BucketEntry> entry = iterator.next();
356      long foundOffset = entry.getValue().offset();
357      int foundLen = entry.getValue().getLength();
358      int bucketSizeIndex = -1;
359      for (int i = 0; i < this.bucketSizes.length; ++i) {
360        if (foundLen <= this.bucketSizes[i]) {
361          bucketSizeIndex = i;
362          break;
363        }
364      }
365      if (bucketSizeIndex == -1) {
366        sizeNotMatchedCount++;
367        iterator.remove();
368        continue;
369      }
370      int bucketNo = (int) (foundOffset / bucketCapacity);
371      if (bucketNo < 0 || bucketNo >= buckets.length) {
372        insufficientCapacityCount++;
373        iterator.remove();
374        continue;
375      }
376      Bucket b = buckets[bucketNo];
377      if (reconfigured[bucketNo]) {
378        if (b.sizeIndex() != bucketSizeIndex) {
379          throw new BucketAllocatorException("Inconsistent allocation in bucket map;");
380        }
381      } else {
382        if (!b.isCompletelyFree()) {
383          throw new BucketAllocatorException(
384              "Reconfiguring bucket " + bucketNo + " but it's already allocated; corrupt data");
385        }
386        // Need to remove the bucket from whichever list it's currently in at
387        // the moment...
388        BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
389        BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
390        oldbsi.removeBucket(b);
391        bsi.instantiateBucket(b);
392        reconfigured[bucketNo] = true;
393      }
394      realCacheSize.add(foundLen);
395      buckets[bucketNo].addAllocation(foundOffset);
396      usedSize += buckets[bucketNo].getItemAllocationSize();
397      bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
398    }
399
400    if (sizeNotMatchedCount > 0) {
401      LOG.warn("There are " + sizeNotMatchedCount + " blocks which can't be rebuilt because " +
402        "there is no matching bucket size for these blocks");
403    }
404    if (insufficientCapacityCount > 0) {
405      LOG.warn("There are " + insufficientCapacityCount + " blocks which can't be rebuilt - "
406        + "did you shrink the cache?");
407    }
408  }
409
410  @Override
411  public String toString() {
412    StringBuilder sb = new StringBuilder(1024);
413    for (int i = 0; i < buckets.length; ++i) {
414      Bucket b = buckets[i];
415      if (i > 0) sb.append(", ");
416      sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize());
417      sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount());
418    }
419    return sb.toString();
420  }
421
422  public long getUsedSize() {
423    return this.usedSize;
424  }
425
426  public long getFreeSize() {
427    return this.totalSize - getUsedSize();
428  }
429
430  public long getTotalSize() {
431    return this.totalSize;
432  }
433
434  /**
435   * Allocate a block with specified size. Return the offset
436   * @param blockSize size of block
437   * @throws BucketAllocatorException
438   * @throws CacheFullException
439   * @return the offset in the IOEngine
440   */
441  public synchronized long allocateBlock(int blockSize) throws CacheFullException,
442      BucketAllocatorException {
443    assert blockSize > 0;
444    BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
445    if (bsi == null) {
446      throw new BucketAllocatorException("Allocation too big size=" + blockSize +
447        "; adjust BucketCache sizes " + BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY +
448        " to accomodate if size seems reasonable and you want it cached.");
449    }
450    long offset = bsi.allocateBlock();
451
452    // Ask caller to free up space and try again!
453    if (offset < 0)
454      throw new CacheFullException(blockSize, bsi.sizeIndex());
455    usedSize += bucketSizes[bsi.sizeIndex()];
456    return offset;
457  }
458
459  private Bucket grabGlobalCompletelyFreeBucket() {
460    for (BucketSizeInfo bsi : bucketSizeInfos) {
461      Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
462      if (b != null) return b;
463    }
464    return null;
465  }
466
467  /**
468   * Free a block with the offset
469   * @param offset block's offset
470   * @return size freed
471   */
472  public synchronized int freeBlock(long offset) {
473    int bucketNo = (int) (offset / bucketCapacity);
474    assert bucketNo >= 0 && bucketNo < buckets.length;
475    Bucket targetBucket = buckets[bucketNo];
476    bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
477    usedSize -= targetBucket.getItemAllocationSize();
478    return targetBucket.getItemAllocationSize();
479  }
480
481  public int sizeIndexOfAllocation(long offset) {
482    int bucketNo = (int) (offset / bucketCapacity);
483    assert bucketNo >= 0 && bucketNo < buckets.length;
484    Bucket targetBucket = buckets[bucketNo];
485    return targetBucket.sizeIndex();
486  }
487
488  public int sizeOfAllocation(long offset) {
489    int bucketNo = (int) (offset / bucketCapacity);
490    assert bucketNo >= 0 && bucketNo < buckets.length;
491    Bucket targetBucket = buckets[bucketNo];
492    return targetBucket.getItemAllocationSize();
493  }
494
495  static class IndexStatistics {
496    private long freeCount, usedCount, itemSize, totalCount;
497
498    public long freeCount() {
499      return freeCount;
500    }
501
502    public long usedCount() {
503      return usedCount;
504    }
505
506    public long totalCount() {
507      return totalCount;
508    }
509
510    public long freeBytes() {
511      return freeCount * itemSize;
512    }
513
514    public long usedBytes() {
515      return usedCount * itemSize;
516    }
517
518    public long totalBytes() {
519      return totalCount * itemSize;
520    }
521
522    public long itemSize() {
523      return itemSize;
524    }
525
526    public IndexStatistics(long free, long used, long itemSize) {
527      setTo(free, used, itemSize);
528    }
529
530    public IndexStatistics() {
531      setTo(-1, -1, 0);
532    }
533
534    public void setTo(long free, long used, long itemSize) {
535      this.itemSize = itemSize;
536      this.freeCount = free;
537      this.usedCount = used;
538      this.totalCount = free + used;
539    }
540  }
541
542  public Bucket [] getBuckets() {
543    return this.buckets;
544  }
545
546  void logStatistics() {
547    IndexStatistics total = new IndexStatistics();
548    IndexStatistics[] stats = getIndexStatistics(total);
549    LOG.info("Bucket allocator statistics follow:\n");
550    LOG.info("  Free bytes=" + total.freeBytes() + "+; used bytes="
551        + total.usedBytes() + "; total bytes=" + total.totalBytes());
552    for (IndexStatistics s : stats) {
553      LOG.info("  Object size " + s.itemSize() + " used=" + s.usedCount()
554          + "; free=" + s.freeCount() + "; total=" + s.totalCount());
555    }
556  }
557
558  IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
559    IndexStatistics[] stats = getIndexStatistics();
560    long totalfree = 0, totalused = 0;
561    for (IndexStatistics stat : stats) {
562      totalfree += stat.freeBytes();
563      totalused += stat.usedBytes();
564    }
565    grandTotal.setTo(totalfree, totalused, 1);
566    return stats;
567  }
568
569  IndexStatistics[] getIndexStatistics() {
570    IndexStatistics[] stats = new IndexStatistics[bucketSizes.length];
571    for (int i = 0; i < stats.length; ++i)
572      stats[i] = bucketSizeInfos[i].statistics();
573    return stats;
574  }
575
576  public long freeBlock(long freeList[]) {
577    long sz = 0;
578    for (int i = 0; i < freeList.length; ++i)
579      sz += freeBlock(freeList[i]);
580    return sz;
581  }
582
583  public int getBucketIndex(long offset) {
584    return (int) (offset / bucketCapacity);
585  }
586
587  /**
588   * Returns a set of indices of the buckets that are least filled
589   * excluding the offsets, we also the fully free buckets for the
590   * BucketSizes where everything is empty and they only have one
591   * completely free bucket as a reserved
592   *
593   * @param excludedBuckets the buckets that need to be excluded due to
594   *                        currently being in used
595   * @param bucketCount     max Number of buckets to return
596   * @return set of bucket indices which could be used for eviction
597   */
598  public Set<Integer> getLeastFilledBuckets(Set<Integer> excludedBuckets,
599                                            int bucketCount) {
600    Queue<Integer> queue = MinMaxPriorityQueue.<Integer>orderedBy(
601        new Comparator<Integer>() {
602          @Override
603          public int compare(Integer left, Integer right) {
604            // We will always get instantiated buckets
605            return Float.compare(
606                ((float) buckets[left].usedCount) / buckets[left].itemCount,
607                ((float) buckets[right].usedCount) / buckets[right].itemCount);
608          }
609        }).maximumSize(bucketCount).create();
610
611    for (int i = 0; i < buckets.length; i ++ ) {
612      if (!excludedBuckets.contains(i) && !buckets[i].isUninstantiated() &&
613          // Avoid the buckets that are the only buckets for a sizeIndex
614          bucketSizeInfos[buckets[i].sizeIndex()].bucketList.size() != 1) {
615        queue.add(i);
616      }
617    }
618
619    Set<Integer> result = new HashSet<>(bucketCount);
620    result.addAll(queue);
621
622    return result;
623  }
624}