View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.io.hfile.bucket;
22  
23  import java.util.Arrays;
24  import java.util.Comparator;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.Map;
28  import java.util.Queue;
29  import java.util.Set;
30  import java.util.concurrent.atomic.AtomicLong;
31  
32  import com.google.common.collect.MinMaxPriorityQueue;
33  import org.apache.commons.collections.map.LinkedMap;
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
38  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
39  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
40  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
41  
42  import com.google.common.base.Objects;
43  import com.google.common.base.Preconditions;
44  import com.google.common.primitives.Ints;
45  
46  /**
47   * This class is used to allocate a block with specified size and free the block
48   * when evicting. It manages an array of buckets, each bucket is associated with
49   * a size and caches elements up to this size. For a completely empty bucket, this
50   * size could be re-specified dynamically.
51   *
52   * This class is not thread safe.
53   */
54  @InterfaceAudience.Private
55  @JsonIgnoreProperties({"indexStatistics", "freeSize", "usedSize"})
56  public final class BucketAllocator {
57    private static final Log LOG = LogFactory.getLog(BucketAllocator.class);
58  
59    @JsonIgnoreProperties({"completelyFree", "uninstantiated"})
60    public final static class Bucket {
61      private long baseOffset;
62      private int itemAllocationSize, sizeIndex;
63      private int itemCount;
64      private int freeList[];
65      private int freeCount, usedCount;
66  
67      public Bucket(long offset) {
68        baseOffset = offset;
69        sizeIndex = -1;
70      }
71  
72      void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) {
73        Preconditions.checkElementIndex(sizeIndex, bucketSizes.length);
74        this.sizeIndex = sizeIndex;
75        itemAllocationSize = bucketSizes[sizeIndex];
76        itemCount = (int) (bucketCapacity / (long) itemAllocationSize);
77        freeCount = itemCount;
78        usedCount = 0;
79        freeList = new int[itemCount];
80        for (int i = 0; i < freeCount; ++i)
81          freeList[i] = i;
82      }
83  
84      public boolean isUninstantiated() {
85        return sizeIndex == -1;
86      }
87  
88      public int sizeIndex() {
89        return sizeIndex;
90      }
91  
92      public int getItemAllocationSize() {
93        return itemAllocationSize;
94      }
95  
96      public boolean hasFreeSpace() {
97        return freeCount > 0;
98      }
99  
100     public boolean isCompletelyFree() {
101       return usedCount == 0;
102     }
103 
104     public int freeCount() {
105       return freeCount;
106     }
107 
108     public int usedCount() {
109       return usedCount;
110     }
111 
112     public int getFreeBytes() {
113       return freeCount * itemAllocationSize;
114     }
115 
116     public int getUsedBytes() {
117       return usedCount * itemAllocationSize;
118     }
119 
120     public long getBaseOffset() {
121       return baseOffset;
122     }
123 
124     /**
125      * Allocate a block in this bucket, return the offset representing the
126      * position in physical space
127      * @return the offset in the IOEngine
128      */
129     public long allocate() {
130       assert freeCount > 0; // Else should not have been called
131       assert sizeIndex != -1;
132       ++usedCount;
133       long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
134       assert offset >= 0;
135       return offset;
136     }
137 
138     public void addAllocation(long offset) throws BucketAllocatorException {
139       offset -= baseOffset;
140       if (offset < 0 || offset % itemAllocationSize != 0)
141         throw new BucketAllocatorException(
142             "Attempt to add allocation for bad offset: " + offset + " base="
143                 + baseOffset + ", bucket size=" + itemAllocationSize);
144       int idx = (int) (offset / itemAllocationSize);
145       boolean matchFound = false;
146       for (int i = 0; i < freeCount; ++i) {
147         if (matchFound) freeList[i - 1] = freeList[i];
148         else if (freeList[i] == idx) matchFound = true;
149       }
150       if (!matchFound)
151         throw new BucketAllocatorException("Couldn't find match for index "
152             + idx + " in free list");
153       ++usedCount;
154       --freeCount;
155     }
156 
157     private void free(long offset) {
158       offset -= baseOffset;
159       assert offset >= 0;
160       assert offset < itemCount * itemAllocationSize;
161       assert offset % itemAllocationSize == 0;
162       assert usedCount > 0;
163       assert freeCount < itemCount; // Else duplicate free
164       int item = (int) (offset / (long) itemAllocationSize);
165       assert !freeListContains(item);
166       --usedCount;
167       freeList[freeCount++] = item;
168     }
169 
170     private boolean freeListContains(int blockNo) {
171       for (int i = 0; i < freeCount; ++i) {
172         if (freeList[i] == blockNo) return true;
173       }
174       return false;
175     }
176   }
177 
178   final class BucketSizeInfo {
179     // Free bucket means it has space to allocate a block;
180     // Completely free bucket means it has no block.
181     private LinkedMap bucketList, freeBuckets, completelyFreeBuckets;
182     private int sizeIndex;
183 
184     BucketSizeInfo(int sizeIndex) {
185       bucketList = new LinkedMap();
186       freeBuckets = new LinkedMap();
187       completelyFreeBuckets = new LinkedMap();
188       this.sizeIndex = sizeIndex;
189     }
190 
191     public synchronized void instantiateBucket(Bucket b) {
192       assert b.isUninstantiated() || b.isCompletelyFree();
193       b.reconfigure(sizeIndex, bucketSizes, bucketCapacity);
194       bucketList.put(b, b);
195       freeBuckets.put(b, b);
196       completelyFreeBuckets.put(b, b);
197     }
198 
199     public int sizeIndex() {
200       return sizeIndex;
201     }
202 
203     /**
204      * Find a bucket to allocate a block
205      * @return the offset in the IOEngine
206      */
207     public long allocateBlock() {
208       Bucket b = null;
209       if (freeBuckets.size() > 0) {
210         // Use up an existing one first...
211         b = (Bucket) freeBuckets.lastKey();
212       }
213       if (b == null) {
214         b = grabGlobalCompletelyFreeBucket();
215         if (b != null) instantiateBucket(b);
216       }
217       if (b == null) return -1;
218       long result = b.allocate();
219       blockAllocated(b);
220       return result;
221     }
222 
223     void blockAllocated(Bucket b) {
224       if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
225       if (!b.hasFreeSpace()) freeBuckets.remove(b);
226     }
227 
228     public Bucket findAndRemoveCompletelyFreeBucket() {
229       Bucket b = null;
230       assert bucketList.size() > 0;
231       if (bucketList.size() == 1) {
232         // So we never get complete starvation of a bucket for a size
233         return null;
234       }
235 
236       if (completelyFreeBuckets.size() > 0) {
237         b = (Bucket) completelyFreeBuckets.firstKey();
238         removeBucket(b);
239       }
240       return b;
241     }
242 
243     private synchronized void removeBucket(Bucket b) {
244       assert b.isCompletelyFree();
245       bucketList.remove(b);
246       freeBuckets.remove(b);
247       completelyFreeBuckets.remove(b);
248     }
249 
250     public void freeBlock(Bucket b, long offset) {
251       assert bucketList.containsKey(b);
252       // else we shouldn't have anything to free...
253       assert (!completelyFreeBuckets.containsKey(b));
254       b.free(offset);
255       if (!freeBuckets.containsKey(b)) freeBuckets.put(b, b);
256       if (b.isCompletelyFree()) completelyFreeBuckets.put(b, b);
257     }
258 
259     public synchronized IndexStatistics statistics() {
260       long free = 0, used = 0;
261       for (Object obj : bucketList.keySet()) {
262         Bucket b = (Bucket) obj;
263         free += b.freeCount();
264         used += b.usedCount();
265       }
266       return new IndexStatistics(free, used, bucketSizes[sizeIndex]);
267     }
268 
269     @Override
270     public String toString() {
271       return Objects.toStringHelper(this.getClass())
272         .add("sizeIndex", sizeIndex)
273         .add("bucketSize", bucketSizes[sizeIndex])
274         .toString();
275     }
276   }
277 
278   // Default block size is 64K, so we choose more sizes near 64K, you'd better
279   // reset it according to your cluster's block size distribution
280   // TODO Support the view of block size distribution statistics
281   private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
282       16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
283       56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
284       192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
285       512 * 1024 + 1024 };
286 
287   /**
288    * Round up the given block size to bucket size, and get the corresponding
289    * BucketSizeInfo
290    */
291   public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
292     for (int i = 0; i < bucketSizes.length; ++i)
293       if (blockSize <= bucketSizes[i])
294         return bucketSizeInfos[i];
295     return null;
296   }
297 
298   static public final int FEWEST_ITEMS_IN_BUCKET = 4;
299 
300   private final int[] bucketSizes;
301   private final int bigItemSize;
302   // The capacity size for each bucket
303   private final long bucketCapacity;
304   private Bucket[] buckets;
305   private BucketSizeInfo[] bucketSizeInfos;
306   private final long totalSize;
307   private long usedSize = 0;
308 
309   BucketAllocator(long availableSpace, int[] bucketSizes)
310       throws BucketAllocatorException {
311     this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes;
312     Arrays.sort(this.bucketSizes);
313     this.bigItemSize = Ints.max(this.bucketSizes);
314     this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize;
315     buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
316     if (buckets.length < this.bucketSizes.length)
317       throw new BucketAllocatorException(
318           "Bucket allocator size too small - must have room for at least "
319               + this.bucketSizes.length + " buckets");
320     bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length];
321     for (int i = 0; i < this.bucketSizes.length; ++i) {
322       bucketSizeInfos[i] = new BucketSizeInfo(i);
323     }
324     for (int i = 0; i < buckets.length; ++i) {
325       buckets[i] = new Bucket(bucketCapacity * i);
326       bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1]
327           .instantiateBucket(buckets[i]);
328     }
329     this.totalSize = ((long) buckets.length) * bucketCapacity;
330   }
331 
332   /**
333    * Rebuild the allocator's data structures from a persisted map.
334    * @param availableSpace capacity of cache
335    * @param map A map stores the block key and BucketEntry(block's meta data
336    *          like offset, length)
337    * @param realCacheSize cached data size statistics for bucket cache
338    * @throws BucketAllocatorException
339    */
340   BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
341       AtomicLong realCacheSize) throws BucketAllocatorException {
342     this(availableSpace, bucketSizes);
343 
344     // each bucket has an offset, sizeindex. probably the buckets are too big
345     // in our default state. so what we do is reconfigure them according to what
346     // we've found. we can only reconfigure each bucket once; if more than once,
347     // we know there's a bug, so we just log the info, throw, and start again...
348     boolean[] reconfigured = new boolean[buckets.length];
349     int sizeNotMatchedCount = 0;
350     int insufficientCapacityCount = 0;
351     Iterator<Map.Entry<BlockCacheKey, BucketEntry>> iterator = map.entrySet().iterator();
352     while (iterator.hasNext()) {
353       Map.Entry<BlockCacheKey, BucketEntry> entry = iterator.next();
354       long foundOffset = entry.getValue().offset();
355       int foundLen = entry.getValue().getLength();
356       int bucketSizeIndex = -1;
357       for (int i = 0; i < this.bucketSizes.length; ++i) {
358         if (foundLen <= this.bucketSizes[i]) {
359           bucketSizeIndex = i;
360           break;
361         }
362       }
363       if (bucketSizeIndex == -1) {
364         sizeNotMatchedCount++;
365         iterator.remove();
366         continue;
367       }
368       int bucketNo = (int) (foundOffset / bucketCapacity);
369       if (bucketNo < 0 || bucketNo >= buckets.length) {
370         insufficientCapacityCount++;
371         iterator.remove();
372         continue;
373       }
374       Bucket b = buckets[bucketNo];
375       if (reconfigured[bucketNo]) {
376         if (b.sizeIndex() != bucketSizeIndex)
377           throw new BucketAllocatorException(
378               "Inconsistent allocation in bucket map;");
379       } else {
380         if (!b.isCompletelyFree())
381           throw new BucketAllocatorException("Reconfiguring bucket "
382               + bucketNo + " but it's already allocated; corrupt data");
383         // Need to remove the bucket from whichever list it's currently in at
384         // the moment...
385         BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
386         BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
387         oldbsi.removeBucket(b);
388         bsi.instantiateBucket(b);
389         reconfigured[bucketNo] = true;
390       }
391       realCacheSize.addAndGet(foundLen);
392       buckets[bucketNo].addAllocation(foundOffset);
393       usedSize += buckets[bucketNo].getItemAllocationSize();
394       bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
395     }
396 
397     if (sizeNotMatchedCount > 0) {
398       LOG.warn("There are " + sizeNotMatchedCount + " blocks which can't be rebuilt because "
399           + "there is no matching bucket size for these blocks");
400     }
401     if (insufficientCapacityCount > 0) {
402       LOG.warn("There are " + insufficientCapacityCount + " blocks which can't be rebuilt - "
403           + "did you shrink the cache?");
404     }
405   }
406 
407   public String toString() {
408     StringBuilder sb = new StringBuilder(1024);
409     for (int i = 0; i < buckets.length; ++i) {
410       Bucket b = buckets[i];
411       if (i > 0) sb.append(", ");
412       sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize());
413       sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount());
414     }
415     return sb.toString();
416   }
417 
418   public long getUsedSize() {
419     return this.usedSize;
420   }
421 
422   public long getFreeSize() {
423     return this.totalSize - getUsedSize();
424   }
425 
426   public long getTotalSize() {
427     return this.totalSize;
428   }
429 
430   /**
431    * Allocate a block with specified size. Return the offset
432    * @param blockSize size of block
433    * @throws BucketAllocatorException
434    * @throws CacheFullException
435    * @return the offset in the IOEngine
436    */
437   public synchronized long allocateBlock(int blockSize) throws CacheFullException,
438       BucketAllocatorException {
439     assert blockSize > 0;
440     BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
441     if (bsi == null) {
442       throw new BucketAllocatorException("Allocation too big size=" + blockSize +
443         "; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY +
444         " to accomodate if size seems reasonable and you want it cached.");
445     }
446     long offset = bsi.allocateBlock();
447 
448     // Ask caller to free up space and try again!
449     if (offset < 0)
450       throw new CacheFullException(blockSize, bsi.sizeIndex());
451     usedSize += bucketSizes[bsi.sizeIndex()];
452     return offset;
453   }
454 
455   private Bucket grabGlobalCompletelyFreeBucket() {
456     for (BucketSizeInfo bsi : bucketSizeInfos) {
457       Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
458       if (b != null) return b;
459     }
460     return null;
461   }
462 
463   /**
464    * Free a block with the offset
465    * @param offset block's offset
466    * @return size freed
467    */
468   public synchronized int freeBlock(long offset) {
469     int bucketNo = (int) (offset / bucketCapacity);
470     assert bucketNo >= 0 && bucketNo < buckets.length;
471     Bucket targetBucket = buckets[bucketNo];
472     bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
473     usedSize -= targetBucket.getItemAllocationSize();
474     return targetBucket.getItemAllocationSize();
475   }
476 
477   public int sizeIndexOfAllocation(long offset) {
478     int bucketNo = (int) (offset / bucketCapacity);
479     assert bucketNo >= 0 && bucketNo < buckets.length;
480     Bucket targetBucket = buckets[bucketNo];
481     return targetBucket.sizeIndex();
482   }
483 
484   public int sizeOfAllocation(long offset) {
485     int bucketNo = (int) (offset / bucketCapacity);
486     assert bucketNo >= 0 && bucketNo < buckets.length;
487     Bucket targetBucket = buckets[bucketNo];
488     return targetBucket.getItemAllocationSize();
489   }
490 
491   static class IndexStatistics {
492     private long freeCount, usedCount, itemSize, totalCount;
493 
494     public long freeCount() {
495       return freeCount;
496     }
497 
498     public long usedCount() {
499       return usedCount;
500     }
501 
502     public long totalCount() {
503       return totalCount;
504     }
505 
506     public long freeBytes() {
507       return freeCount * itemSize;
508     }
509 
510     public long usedBytes() {
511       return usedCount * itemSize;
512     }
513 
514     public long totalBytes() {
515       return totalCount * itemSize;
516     }
517 
518     public long itemSize() {
519       return itemSize;
520     }
521 
522     public IndexStatistics(long free, long used, long itemSize) {
523       setTo(free, used, itemSize);
524     }
525 
526     public IndexStatistics() {
527       setTo(-1, -1, 0);
528     }
529 
530     public void setTo(long free, long used, long itemSize) {
531       this.itemSize = itemSize;
532       this.freeCount = free;
533       this.usedCount = used;
534       this.totalCount = free + used;
535     }
536   }
537 
538   public Bucket [] getBuckets() {
539     return this.buckets;
540   }
541 
542   void logStatistics() {
543     IndexStatistics total = new IndexStatistics();
544     IndexStatistics[] stats = getIndexStatistics(total);
545     LOG.info("Bucket allocator statistics follow:\n");
546     LOG.info("  Free bytes=" + total.freeBytes() + "+; used bytes="
547         + total.usedBytes() + "; total bytes=" + total.totalBytes());
548     for (IndexStatistics s : stats) {
549       LOG.info("  Object size " + s.itemSize() + " used=" + s.usedCount()
550           + "; free=" + s.freeCount() + "; total=" + s.totalCount());
551     }
552   }
553 
554   IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
555     IndexStatistics[] stats = getIndexStatistics();
556     long totalfree = 0, totalused = 0;
557     for (IndexStatistics stat : stats) {
558       totalfree += stat.freeBytes();
559       totalused += stat.usedBytes();
560     }
561     grandTotal.setTo(totalfree, totalused, 1);
562     return stats;
563   }
564 
565   IndexStatistics[] getIndexStatistics() {
566     IndexStatistics[] stats = new IndexStatistics[bucketSizes.length];
567     for (int i = 0; i < stats.length; ++i)
568       stats[i] = bucketSizeInfos[i].statistics();
569     return stats;
570   }
571 
572   public long freeBlock(long freeList[]) {
573     long sz = 0;
574     for (int i = 0; i < freeList.length; ++i)
575       sz += freeBlock(freeList[i]);
576     return sz;
577   }
578 
579   public int getBucketIndex(long offset) {
580     return (int) (offset / bucketCapacity);
581   }
582 
583   /**
584    * Returns a set of indices of the buckets that are least filled
585    * excluding the offsets, we also the fully free buckets for the
586    * BucketSizes where everything is empty and they only have one
587    * completely free bucket as a reserved
588    *
589    * @param excludedBuckets the buckets that need to be excluded due to
590    *                        currently being in used
591    * @param bucketCount     max Number of buckets to return
592    * @return set of bucket indices which could be used for eviction
593    */
594   public Set<Integer> getLeastFilledBuckets(Set<Integer> excludedBuckets,
595                                             int bucketCount) {
596     Queue<Integer> queue = MinMaxPriorityQueue.<Integer>orderedBy(
597         new Comparator<Integer>() {
598           @Override
599           public int compare(Integer left, Integer right) {
600             // We will always get instantiated buckets
601             return Float.compare(
602                 ((float) buckets[left].usedCount) / buckets[left].itemCount,
603                 ((float) buckets[right].usedCount) / buckets[right].itemCount);
604           }
605         }).maximumSize(bucketCount).create();
606 
607     for (int i = 0; i < buckets.length; i ++ ) {
608       if (!excludedBuckets.contains(i) && !buckets[i].isUninstantiated() &&
609           // Avoid the buckets that are the only buckets for a sizeIndex
610           bucketSizeInfos[buckets[i].sizeIndex()].bucketList.size() != 1) {
611         queue.add(i);
612       }
613     }
614 
615     Set<Integer> result = new HashSet<>(bucketCount);
616     result.addAll(queue);
617 
618     return result;
619   }
620 }