1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.io.hfile.bucket;
22
23 import java.util.Arrays;
24 import java.util.Comparator;
25 import java.util.HashSet;
26 import java.util.Iterator;
27 import java.util.Map;
28 import java.util.Queue;
29 import java.util.Set;
30 import java.util.concurrent.atomic.AtomicLong;
31
32 import com.google.common.collect.MinMaxPriorityQueue;
33 import org.apache.commons.collections.map.LinkedMap;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.hbase.classification.InterfaceAudience;
37 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
38 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
39 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
40 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
41
42 import com.google.common.base.Objects;
43 import com.google.common.base.Preconditions;
44 import com.google.common.primitives.Ints;
45
46
47
48
49
50
51
52
53
54 @InterfaceAudience.Private
55 @JsonIgnoreProperties({"indexStatistics", "freeSize", "usedSize"})
56 public final class BucketAllocator {
57 private static final Log LOG = LogFactory.getLog(BucketAllocator.class);
58
59 @JsonIgnoreProperties({"completelyFree", "uninstantiated"})
60 public final static class Bucket {
61 private long baseOffset;
62 private int itemAllocationSize, sizeIndex;
63 private int itemCount;
64 private int freeList[];
65 private int freeCount, usedCount;
66
67 public Bucket(long offset) {
68 baseOffset = offset;
69 sizeIndex = -1;
70 }
71
72 void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) {
73 Preconditions.checkElementIndex(sizeIndex, bucketSizes.length);
74 this.sizeIndex = sizeIndex;
75 itemAllocationSize = bucketSizes[sizeIndex];
76 itemCount = (int) (bucketCapacity / (long) itemAllocationSize);
77 freeCount = itemCount;
78 usedCount = 0;
79 freeList = new int[itemCount];
80 for (int i = 0; i < freeCount; ++i)
81 freeList[i] = i;
82 }
83
84 public boolean isUninstantiated() {
85 return sizeIndex == -1;
86 }
87
88 public int sizeIndex() {
89 return sizeIndex;
90 }
91
92 public int getItemAllocationSize() {
93 return itemAllocationSize;
94 }
95
96 public boolean hasFreeSpace() {
97 return freeCount > 0;
98 }
99
100 public boolean isCompletelyFree() {
101 return usedCount == 0;
102 }
103
104 public int freeCount() {
105 return freeCount;
106 }
107
108 public int usedCount() {
109 return usedCount;
110 }
111
112 public int getFreeBytes() {
113 return freeCount * itemAllocationSize;
114 }
115
116 public int getUsedBytes() {
117 return usedCount * itemAllocationSize;
118 }
119
120 public long getBaseOffset() {
121 return baseOffset;
122 }
123
124
125
126
127
128
129 public long allocate() {
130 assert freeCount > 0;
131 assert sizeIndex != -1;
132 ++usedCount;
133 long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
134 assert offset >= 0;
135 return offset;
136 }
137
138 public void addAllocation(long offset) throws BucketAllocatorException {
139 offset -= baseOffset;
140 if (offset < 0 || offset % itemAllocationSize != 0)
141 throw new BucketAllocatorException(
142 "Attempt to add allocation for bad offset: " + offset + " base="
143 + baseOffset + ", bucket size=" + itemAllocationSize);
144 int idx = (int) (offset / itemAllocationSize);
145 boolean matchFound = false;
146 for (int i = 0; i < freeCount; ++i) {
147 if (matchFound) freeList[i - 1] = freeList[i];
148 else if (freeList[i] == idx) matchFound = true;
149 }
150 if (!matchFound)
151 throw new BucketAllocatorException("Couldn't find match for index "
152 + idx + " in free list");
153 ++usedCount;
154 --freeCount;
155 }
156
157 private void free(long offset) {
158 offset -= baseOffset;
159 assert offset >= 0;
160 assert offset < itemCount * itemAllocationSize;
161 assert offset % itemAllocationSize == 0;
162 assert usedCount > 0;
163 assert freeCount < itemCount;
164 int item = (int) (offset / (long) itemAllocationSize);
165 assert !freeListContains(item);
166 --usedCount;
167 freeList[freeCount++] = item;
168 }
169
170 private boolean freeListContains(int blockNo) {
171 for (int i = 0; i < freeCount; ++i) {
172 if (freeList[i] == blockNo) return true;
173 }
174 return false;
175 }
176 }
177
178 final class BucketSizeInfo {
179
180
181 private LinkedMap bucketList, freeBuckets, completelyFreeBuckets;
182 private int sizeIndex;
183
184 BucketSizeInfo(int sizeIndex) {
185 bucketList = new LinkedMap();
186 freeBuckets = new LinkedMap();
187 completelyFreeBuckets = new LinkedMap();
188 this.sizeIndex = sizeIndex;
189 }
190
191 public synchronized void instantiateBucket(Bucket b) {
192 assert b.isUninstantiated() || b.isCompletelyFree();
193 b.reconfigure(sizeIndex, bucketSizes, bucketCapacity);
194 bucketList.put(b, b);
195 freeBuckets.put(b, b);
196 completelyFreeBuckets.put(b, b);
197 }
198
199 public int sizeIndex() {
200 return sizeIndex;
201 }
202
203
204
205
206
207 public long allocateBlock() {
208 Bucket b = null;
209 if (freeBuckets.size() > 0) {
210
211 b = (Bucket) freeBuckets.lastKey();
212 }
213 if (b == null) {
214 b = grabGlobalCompletelyFreeBucket();
215 if (b != null) instantiateBucket(b);
216 }
217 if (b == null) return -1;
218 long result = b.allocate();
219 blockAllocated(b);
220 return result;
221 }
222
223 void blockAllocated(Bucket b) {
224 if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
225 if (!b.hasFreeSpace()) freeBuckets.remove(b);
226 }
227
228 public Bucket findAndRemoveCompletelyFreeBucket() {
229 Bucket b = null;
230 assert bucketList.size() > 0;
231 if (bucketList.size() == 1) {
232
233 return null;
234 }
235
236 if (completelyFreeBuckets.size() > 0) {
237 b = (Bucket) completelyFreeBuckets.firstKey();
238 removeBucket(b);
239 }
240 return b;
241 }
242
243 private synchronized void removeBucket(Bucket b) {
244 assert b.isCompletelyFree();
245 bucketList.remove(b);
246 freeBuckets.remove(b);
247 completelyFreeBuckets.remove(b);
248 }
249
250 public void freeBlock(Bucket b, long offset) {
251 assert bucketList.containsKey(b);
252
253 assert (!completelyFreeBuckets.containsKey(b));
254 b.free(offset);
255 if (!freeBuckets.containsKey(b)) freeBuckets.put(b, b);
256 if (b.isCompletelyFree()) completelyFreeBuckets.put(b, b);
257 }
258
259 public synchronized IndexStatistics statistics() {
260 long free = 0, used = 0;
261 for (Object obj : bucketList.keySet()) {
262 Bucket b = (Bucket) obj;
263 free += b.freeCount();
264 used += b.usedCount();
265 }
266 return new IndexStatistics(free, used, bucketSizes[sizeIndex]);
267 }
268
269 @Override
270 public String toString() {
271 return Objects.toStringHelper(this.getClass())
272 .add("sizeIndex", sizeIndex)
273 .add("bucketSize", bucketSizes[sizeIndex])
274 .toString();
275 }
276 }
277
278
279
280
281 private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
282 16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
283 56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
284 192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
285 512 * 1024 + 1024 };
286
287
288
289
290
291 public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
292 for (int i = 0; i < bucketSizes.length; ++i)
293 if (blockSize <= bucketSizes[i])
294 return bucketSizeInfos[i];
295 return null;
296 }
297
298 static public final int FEWEST_ITEMS_IN_BUCKET = 4;
299
300 private final int[] bucketSizes;
301 private final int bigItemSize;
302
303 private final long bucketCapacity;
304 private Bucket[] buckets;
305 private BucketSizeInfo[] bucketSizeInfos;
306 private final long totalSize;
307 private long usedSize = 0;
308
309 BucketAllocator(long availableSpace, int[] bucketSizes)
310 throws BucketAllocatorException {
311 this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes;
312 Arrays.sort(this.bucketSizes);
313 this.bigItemSize = Ints.max(this.bucketSizes);
314 this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize;
315 buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
316 if (buckets.length < this.bucketSizes.length)
317 throw new BucketAllocatorException(
318 "Bucket allocator size too small - must have room for at least "
319 + this.bucketSizes.length + " buckets");
320 bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length];
321 for (int i = 0; i < this.bucketSizes.length; ++i) {
322 bucketSizeInfos[i] = new BucketSizeInfo(i);
323 }
324 for (int i = 0; i < buckets.length; ++i) {
325 buckets[i] = new Bucket(bucketCapacity * i);
326 bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1]
327 .instantiateBucket(buckets[i]);
328 }
329 this.totalSize = ((long) buckets.length) * bucketCapacity;
330 }
331
332
333
334
335
336
337
338
339
340 BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
341 AtomicLong realCacheSize) throws BucketAllocatorException {
342 this(availableSpace, bucketSizes);
343
344
345
346
347
348 boolean[] reconfigured = new boolean[buckets.length];
349 int sizeNotMatchedCount = 0;
350 int insufficientCapacityCount = 0;
351 Iterator<Map.Entry<BlockCacheKey, BucketEntry>> iterator = map.entrySet().iterator();
352 while (iterator.hasNext()) {
353 Map.Entry<BlockCacheKey, BucketEntry> entry = iterator.next();
354 long foundOffset = entry.getValue().offset();
355 int foundLen = entry.getValue().getLength();
356 int bucketSizeIndex = -1;
357 for (int i = 0; i < this.bucketSizes.length; ++i) {
358 if (foundLen <= this.bucketSizes[i]) {
359 bucketSizeIndex = i;
360 break;
361 }
362 }
363 if (bucketSizeIndex == -1) {
364 sizeNotMatchedCount++;
365 iterator.remove();
366 continue;
367 }
368 int bucketNo = (int) (foundOffset / bucketCapacity);
369 if (bucketNo < 0 || bucketNo >= buckets.length) {
370 insufficientCapacityCount++;
371 iterator.remove();
372 continue;
373 }
374 Bucket b = buckets[bucketNo];
375 if (reconfigured[bucketNo]) {
376 if (b.sizeIndex() != bucketSizeIndex)
377 throw new BucketAllocatorException(
378 "Inconsistent allocation in bucket map;");
379 } else {
380 if (!b.isCompletelyFree())
381 throw new BucketAllocatorException("Reconfiguring bucket "
382 + bucketNo + " but it's already allocated; corrupt data");
383
384
385 BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
386 BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
387 oldbsi.removeBucket(b);
388 bsi.instantiateBucket(b);
389 reconfigured[bucketNo] = true;
390 }
391 realCacheSize.addAndGet(foundLen);
392 buckets[bucketNo].addAllocation(foundOffset);
393 usedSize += buckets[bucketNo].getItemAllocationSize();
394 bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
395 }
396
397 if (sizeNotMatchedCount > 0) {
398 LOG.warn("There are " + sizeNotMatchedCount + " blocks which can't be rebuilt because "
399 + "there is no matching bucket size for these blocks");
400 }
401 if (insufficientCapacityCount > 0) {
402 LOG.warn("There are " + insufficientCapacityCount + " blocks which can't be rebuilt - "
403 + "did you shrink the cache?");
404 }
405 }
406
407 public String toString() {
408 StringBuilder sb = new StringBuilder(1024);
409 for (int i = 0; i < buckets.length; ++i) {
410 Bucket b = buckets[i];
411 if (i > 0) sb.append(", ");
412 sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize());
413 sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount());
414 }
415 return sb.toString();
416 }
417
418 public long getUsedSize() {
419 return this.usedSize;
420 }
421
422 public long getFreeSize() {
423 return this.totalSize - getUsedSize();
424 }
425
426 public long getTotalSize() {
427 return this.totalSize;
428 }
429
430
431
432
433
434
435
436
437 public synchronized long allocateBlock(int blockSize) throws CacheFullException,
438 BucketAllocatorException {
439 assert blockSize > 0;
440 BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
441 if (bsi == null) {
442 throw new BucketAllocatorException("Allocation too big size=" + blockSize +
443 "; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY +
444 " to accomodate if size seems reasonable and you want it cached.");
445 }
446 long offset = bsi.allocateBlock();
447
448
449 if (offset < 0)
450 throw new CacheFullException(blockSize, bsi.sizeIndex());
451 usedSize += bucketSizes[bsi.sizeIndex()];
452 return offset;
453 }
454
455 private Bucket grabGlobalCompletelyFreeBucket() {
456 for (BucketSizeInfo bsi : bucketSizeInfos) {
457 Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
458 if (b != null) return b;
459 }
460 return null;
461 }
462
463
464
465
466
467
468 public synchronized int freeBlock(long offset) {
469 int bucketNo = (int) (offset / bucketCapacity);
470 assert bucketNo >= 0 && bucketNo < buckets.length;
471 Bucket targetBucket = buckets[bucketNo];
472 bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
473 usedSize -= targetBucket.getItemAllocationSize();
474 return targetBucket.getItemAllocationSize();
475 }
476
477 public int sizeIndexOfAllocation(long offset) {
478 int bucketNo = (int) (offset / bucketCapacity);
479 assert bucketNo >= 0 && bucketNo < buckets.length;
480 Bucket targetBucket = buckets[bucketNo];
481 return targetBucket.sizeIndex();
482 }
483
484 public int sizeOfAllocation(long offset) {
485 int bucketNo = (int) (offset / bucketCapacity);
486 assert bucketNo >= 0 && bucketNo < buckets.length;
487 Bucket targetBucket = buckets[bucketNo];
488 return targetBucket.getItemAllocationSize();
489 }
490
491 static class IndexStatistics {
492 private long freeCount, usedCount, itemSize, totalCount;
493
494 public long freeCount() {
495 return freeCount;
496 }
497
498 public long usedCount() {
499 return usedCount;
500 }
501
502 public long totalCount() {
503 return totalCount;
504 }
505
506 public long freeBytes() {
507 return freeCount * itemSize;
508 }
509
510 public long usedBytes() {
511 return usedCount * itemSize;
512 }
513
514 public long totalBytes() {
515 return totalCount * itemSize;
516 }
517
518 public long itemSize() {
519 return itemSize;
520 }
521
522 public IndexStatistics(long free, long used, long itemSize) {
523 setTo(free, used, itemSize);
524 }
525
526 public IndexStatistics() {
527 setTo(-1, -1, 0);
528 }
529
530 public void setTo(long free, long used, long itemSize) {
531 this.itemSize = itemSize;
532 this.freeCount = free;
533 this.usedCount = used;
534 this.totalCount = free + used;
535 }
536 }
537
538 public Bucket [] getBuckets() {
539 return this.buckets;
540 }
541
542 void logStatistics() {
543 IndexStatistics total = new IndexStatistics();
544 IndexStatistics[] stats = getIndexStatistics(total);
545 LOG.info("Bucket allocator statistics follow:\n");
546 LOG.info(" Free bytes=" + total.freeBytes() + "+; used bytes="
547 + total.usedBytes() + "; total bytes=" + total.totalBytes());
548 for (IndexStatistics s : stats) {
549 LOG.info(" Object size " + s.itemSize() + " used=" + s.usedCount()
550 + "; free=" + s.freeCount() + "; total=" + s.totalCount());
551 }
552 }
553
554 IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
555 IndexStatistics[] stats = getIndexStatistics();
556 long totalfree = 0, totalused = 0;
557 for (IndexStatistics stat : stats) {
558 totalfree += stat.freeBytes();
559 totalused += stat.usedBytes();
560 }
561 grandTotal.setTo(totalfree, totalused, 1);
562 return stats;
563 }
564
565 IndexStatistics[] getIndexStatistics() {
566 IndexStatistics[] stats = new IndexStatistics[bucketSizes.length];
567 for (int i = 0; i < stats.length; ++i)
568 stats[i] = bucketSizeInfos[i].statistics();
569 return stats;
570 }
571
572 public long freeBlock(long freeList[]) {
573 long sz = 0;
574 for (int i = 0; i < freeList.length; ++i)
575 sz += freeBlock(freeList[i]);
576 return sz;
577 }
578
579 public int getBucketIndex(long offset) {
580 return (int) (offset / bucketCapacity);
581 }
582
583
584
585
586
587
588
589
590
591
592
593
594 public Set<Integer> getLeastFilledBuckets(Set<Integer> excludedBuckets,
595 int bucketCount) {
596 Queue<Integer> queue = MinMaxPriorityQueue.<Integer>orderedBy(
597 new Comparator<Integer>() {
598 @Override
599 public int compare(Integer left, Integer right) {
600
601 return Float.compare(
602 ((float) buckets[left].usedCount) / buckets[left].itemCount,
603 ((float) buckets[right].usedCount) / buckets[right].itemCount);
604 }
605 }).maximumSize(bucketCount).create();
606
607 for (int i = 0; i < buckets.length; i ++ ) {
608 if (!excludedBuckets.contains(i) && !buckets[i].isUninstantiated() &&
609
610 bucketSizeInfos[buckets[i].sizeIndex()].bucketList.size() != 1) {
611 queue.add(i);
612 }
613 }
614
615 Set<Integer> result = new HashSet<>(bucketCount);
616 result.addAll(queue);
617
618 return result;
619 }
620 }