001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.io.hfile.bucket; 020 021import java.util.Arrays; 022import java.util.Comparator; 023import java.util.HashSet; 024import java.util.Iterator; 025import java.util.Map; 026import java.util.Queue; 027import java.util.Set; 028import java.util.concurrent.atomic.LongAdder; 029import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 030import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 031import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 037import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 038import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue; 039import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 040import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.LinkedMap; 041 042/** 043 * This class is used to allocate a block with specified size and free the block when evicting. It 044 * manages an array of buckets, each bucket is associated with a size and caches elements up to this 045 * size. For a completely empty bucket, this size could be re-specified dynamically. 046 * <p/> 047 * This class is not thread safe. 048 */ 049@InterfaceAudience.Private 050public final class BucketAllocator { 051 private static final Logger LOG = LoggerFactory.getLogger(BucketAllocator.class); 052 053 public final static class Bucket { 054 private long baseOffset; 055 private int itemAllocationSize, sizeIndex; 056 private int itemCount; 057 private int freeList[]; 058 private int freeCount, usedCount; 059 060 public Bucket(long offset) { 061 baseOffset = offset; 062 sizeIndex = -1; 063 } 064 065 void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) { 066 Preconditions.checkElementIndex(sizeIndex, bucketSizes.length); 067 this.sizeIndex = sizeIndex; 068 itemAllocationSize = bucketSizes[sizeIndex]; 069 itemCount = (int) (bucketCapacity / (long) itemAllocationSize); 070 freeCount = itemCount; 071 usedCount = 0; 072 freeList = new int[itemCount]; 073 for (int i = 0; i < freeCount; ++i) 074 freeList[i] = i; 075 } 076 077 public boolean isUninstantiated() { 078 return sizeIndex == -1; 079 } 080 081 public int sizeIndex() { 082 return sizeIndex; 083 } 084 085 public int getItemAllocationSize() { 086 return itemAllocationSize; 087 } 088 089 public boolean hasFreeSpace() { 090 return freeCount > 0; 091 } 092 093 public boolean isCompletelyFree() { 094 return usedCount == 0; 095 } 096 097 public int freeCount() { 098 return freeCount; 099 } 100 101 public int usedCount() { 102 return usedCount; 103 } 104 105 public int getFreeBytes() { 106 return freeCount * itemAllocationSize; 107 } 108 109 public int getUsedBytes() { 110 return usedCount * itemAllocationSize; 111 } 112 113 public long getBaseOffset() { 114 return baseOffset; 115 } 116 117 /** 118 * Allocate a block in this bucket, return the offset representing the 119 * position in physical space 120 * @return the offset in the IOEngine 121 */ 122 public long allocate() { 123 assert freeCount > 0; // Else should not have been called 124 assert sizeIndex != -1; 125 ++usedCount; 126 long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize); 127 assert offset >= 0; 128 return offset; 129 } 130 131 public void addAllocation(long offset) throws BucketAllocatorException { 132 offset -= baseOffset; 133 if (offset < 0 || offset % itemAllocationSize != 0) 134 throw new BucketAllocatorException( 135 "Attempt to add allocation for bad offset: " + offset + " base=" 136 + baseOffset + ", bucket size=" + itemAllocationSize); 137 int idx = (int) (offset / itemAllocationSize); 138 boolean matchFound = false; 139 for (int i = 0; i < freeCount; ++i) { 140 if (matchFound) freeList[i - 1] = freeList[i]; 141 else if (freeList[i] == idx) matchFound = true; 142 } 143 if (!matchFound) 144 throw new BucketAllocatorException("Couldn't find match for index " 145 + idx + " in free list"); 146 ++usedCount; 147 --freeCount; 148 } 149 150 private void free(long offset) { 151 offset -= baseOffset; 152 assert offset >= 0; 153 assert offset < itemCount * itemAllocationSize; 154 assert offset % itemAllocationSize == 0; 155 assert usedCount > 0; 156 assert freeCount < itemCount; // Else duplicate free 157 int item = (int) (offset / (long) itemAllocationSize); 158 assert !freeListContains(item); 159 --usedCount; 160 freeList[freeCount++] = item; 161 } 162 163 private boolean freeListContains(int blockNo) { 164 for (int i = 0; i < freeCount; ++i) { 165 if (freeList[i] == blockNo) return true; 166 } 167 return false; 168 } 169 } 170 171 final class BucketSizeInfo { 172 // Free bucket means it has space to allocate a block; 173 // Completely free bucket means it has no block. 174 private LinkedMap bucketList, freeBuckets, completelyFreeBuckets; 175 private int sizeIndex; 176 177 BucketSizeInfo(int sizeIndex) { 178 bucketList = new LinkedMap(); 179 freeBuckets = new LinkedMap(); 180 completelyFreeBuckets = new LinkedMap(); 181 this.sizeIndex = sizeIndex; 182 } 183 184 public synchronized void instantiateBucket(Bucket b) { 185 assert b.isUninstantiated() || b.isCompletelyFree(); 186 b.reconfigure(sizeIndex, bucketSizes, bucketCapacity); 187 bucketList.put(b, b); 188 freeBuckets.put(b, b); 189 completelyFreeBuckets.put(b, b); 190 } 191 192 public int sizeIndex() { 193 return sizeIndex; 194 } 195 196 /** 197 * Find a bucket to allocate a block 198 * @return the offset in the IOEngine 199 */ 200 public long allocateBlock() { 201 Bucket b = null; 202 if (freeBuckets.size() > 0) { 203 // Use up an existing one first... 204 b = (Bucket) freeBuckets.lastKey(); 205 } 206 if (b == null) { 207 b = grabGlobalCompletelyFreeBucket(); 208 if (b != null) instantiateBucket(b); 209 } 210 if (b == null) return -1; 211 long result = b.allocate(); 212 blockAllocated(b); 213 return result; 214 } 215 216 void blockAllocated(Bucket b) { 217 if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b); 218 if (!b.hasFreeSpace()) freeBuckets.remove(b); 219 } 220 221 public Bucket findAndRemoveCompletelyFreeBucket() { 222 Bucket b = null; 223 assert bucketList.size() > 0; 224 if (bucketList.size() == 1) { 225 // So we never get complete starvation of a bucket for a size 226 return null; 227 } 228 229 if (completelyFreeBuckets.size() > 0) { 230 b = (Bucket) completelyFreeBuckets.firstKey(); 231 removeBucket(b); 232 } 233 return b; 234 } 235 236 private synchronized void removeBucket(Bucket b) { 237 assert b.isCompletelyFree(); 238 bucketList.remove(b); 239 freeBuckets.remove(b); 240 completelyFreeBuckets.remove(b); 241 } 242 243 public void freeBlock(Bucket b, long offset) { 244 assert bucketList.containsKey(b); 245 // else we shouldn't have anything to free... 246 assert (!completelyFreeBuckets.containsKey(b)); 247 b.free(offset); 248 if (!freeBuckets.containsKey(b)) freeBuckets.put(b, b); 249 if (b.isCompletelyFree()) completelyFreeBuckets.put(b, b); 250 } 251 252 public synchronized IndexStatistics statistics() { 253 long free = 0, used = 0; 254 for (Object obj : bucketList.keySet()) { 255 Bucket b = (Bucket) obj; 256 free += b.freeCount(); 257 used += b.usedCount(); 258 } 259 return new IndexStatistics(free, used, bucketSizes[sizeIndex]); 260 } 261 262 @Override 263 public String toString() { 264 return MoreObjects.toStringHelper(this.getClass()) 265 .add("sizeIndex", sizeIndex) 266 .add("bucketSize", bucketSizes[sizeIndex]) 267 .toString(); 268 } 269 } 270 271 // Default block size in hbase is 64K, so we choose more sizes near 64K, you'd better 272 // reset it according to your cluster's block size distribution 273 // TODO Support the view of block size distribution statistics 274 // TODO: Why we add the extra 1024 bytes? Slop? 275 private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024, 276 16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024, 277 56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024, 278 192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024, 279 512 * 1024 + 1024 }; 280 281 /** 282 * Round up the given block size to bucket size, and get the corresponding 283 * BucketSizeInfo 284 */ 285 public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) { 286 for (int i = 0; i < bucketSizes.length; ++i) 287 if (blockSize <= bucketSizes[i]) 288 return bucketSizeInfos[i]; 289 return null; 290 } 291 292 /** 293 * So, what is the minimum amount of items we'll tolerate in a single bucket? 294 */ 295 static public final int FEWEST_ITEMS_IN_BUCKET = 4; 296 297 private final int[] bucketSizes; 298 private final int bigItemSize; 299 // The capacity size for each bucket 300 private final long bucketCapacity; 301 private Bucket[] buckets; 302 private BucketSizeInfo[] bucketSizeInfos; 303 private final long totalSize; 304 private transient long usedSize = 0; 305 306 BucketAllocator(long availableSpace, int[] bucketSizes) 307 throws BucketAllocatorException { 308 this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes; 309 Arrays.sort(this.bucketSizes); 310 this.bigItemSize = Ints.max(this.bucketSizes); 311 this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * (long) bigItemSize; 312 buckets = new Bucket[(int) (availableSpace / bucketCapacity)]; 313 if (buckets.length < this.bucketSizes.length) 314 throw new BucketAllocatorException("Bucket allocator size too small (" + buckets.length + 315 "); must have room for at least " + this.bucketSizes.length + " buckets"); 316 bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length]; 317 for (int i = 0; i < this.bucketSizes.length; ++i) { 318 bucketSizeInfos[i] = new BucketSizeInfo(i); 319 } 320 for (int i = 0; i < buckets.length; ++i) { 321 buckets[i] = new Bucket(bucketCapacity * i); 322 bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1] 323 .instantiateBucket(buckets[i]); 324 } 325 this.totalSize = ((long) buckets.length) * bucketCapacity; 326 if (LOG.isInfoEnabled()) { 327 LOG.info("Cache totalSize=" + this.totalSize + ", buckets=" + this.buckets.length + 328 ", bucket capacity=" + this.bucketCapacity + 329 "=(" + FEWEST_ITEMS_IN_BUCKET + "*" + this.bigItemSize + ")=" + 330 "(FEWEST_ITEMS_IN_BUCKET*(largest configured bucketcache size))"); 331 } 332 } 333 334 /** 335 * Rebuild the allocator's data structures from a persisted map. 336 * @param availableSpace capacity of cache 337 * @param map A map stores the block key and BucketEntry(block's meta data 338 * like offset, length) 339 * @param realCacheSize cached data size statistics for bucket cache 340 * @throws BucketAllocatorException 341 */ 342 BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map, 343 LongAdder realCacheSize) throws BucketAllocatorException { 344 this(availableSpace, bucketSizes); 345 346 // each bucket has an offset, sizeindex. probably the buckets are too big 347 // in our default state. so what we do is reconfigure them according to what 348 // we've found. we can only reconfigure each bucket once; if more than once, 349 // we know there's a bug, so we just log the info, throw, and start again... 350 boolean[] reconfigured = new boolean[buckets.length]; 351 int sizeNotMatchedCount = 0; 352 int insufficientCapacityCount = 0; 353 Iterator<Map.Entry<BlockCacheKey, BucketEntry>> iterator = map.entrySet().iterator(); 354 while (iterator.hasNext()) { 355 Map.Entry<BlockCacheKey, BucketEntry> entry = iterator.next(); 356 long foundOffset = entry.getValue().offset(); 357 int foundLen = entry.getValue().getLength(); 358 int bucketSizeIndex = -1; 359 for (int i = 0; i < this.bucketSizes.length; ++i) { 360 if (foundLen <= this.bucketSizes[i]) { 361 bucketSizeIndex = i; 362 break; 363 } 364 } 365 if (bucketSizeIndex == -1) { 366 sizeNotMatchedCount++; 367 iterator.remove(); 368 continue; 369 } 370 int bucketNo = (int) (foundOffset / bucketCapacity); 371 if (bucketNo < 0 || bucketNo >= buckets.length) { 372 insufficientCapacityCount++; 373 iterator.remove(); 374 continue; 375 } 376 Bucket b = buckets[bucketNo]; 377 if (reconfigured[bucketNo]) { 378 if (b.sizeIndex() != bucketSizeIndex) { 379 throw new BucketAllocatorException("Inconsistent allocation in bucket map;"); 380 } 381 } else { 382 if (!b.isCompletelyFree()) { 383 throw new BucketAllocatorException( 384 "Reconfiguring bucket " + bucketNo + " but it's already allocated; corrupt data"); 385 } 386 // Need to remove the bucket from whichever list it's currently in at 387 // the moment... 388 BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex]; 389 BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()]; 390 oldbsi.removeBucket(b); 391 bsi.instantiateBucket(b); 392 reconfigured[bucketNo] = true; 393 } 394 realCacheSize.add(foundLen); 395 buckets[bucketNo].addAllocation(foundOffset); 396 usedSize += buckets[bucketNo].getItemAllocationSize(); 397 bucketSizeInfos[bucketSizeIndex].blockAllocated(b); 398 } 399 400 if (sizeNotMatchedCount > 0) { 401 LOG.warn("There are " + sizeNotMatchedCount + " blocks which can't be rebuilt because " + 402 "there is no matching bucket size for these blocks"); 403 } 404 if (insufficientCapacityCount > 0) { 405 LOG.warn("There are " + insufficientCapacityCount + " blocks which can't be rebuilt - " 406 + "did you shrink the cache?"); 407 } 408 } 409 410 @Override 411 public String toString() { 412 StringBuilder sb = new StringBuilder(1024); 413 for (int i = 0; i < buckets.length; ++i) { 414 Bucket b = buckets[i]; 415 if (i > 0) sb.append(", "); 416 sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize()); 417 sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount()); 418 } 419 return sb.toString(); 420 } 421 422 public long getUsedSize() { 423 return this.usedSize; 424 } 425 426 public long getFreeSize() { 427 return this.totalSize - getUsedSize(); 428 } 429 430 public long getTotalSize() { 431 return this.totalSize; 432 } 433 434 /** 435 * Allocate a block with specified size. Return the offset 436 * @param blockSize size of block 437 * @throws BucketAllocatorException 438 * @throws CacheFullException 439 * @return the offset in the IOEngine 440 */ 441 public synchronized long allocateBlock(int blockSize) throws CacheFullException, 442 BucketAllocatorException { 443 assert blockSize > 0; 444 BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize); 445 if (bsi == null) { 446 throw new BucketAllocatorException("Allocation too big size=" + blockSize + 447 "; adjust BucketCache sizes " + BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY + 448 " to accomodate if size seems reasonable and you want it cached."); 449 } 450 long offset = bsi.allocateBlock(); 451 452 // Ask caller to free up space and try again! 453 if (offset < 0) 454 throw new CacheFullException(blockSize, bsi.sizeIndex()); 455 usedSize += bucketSizes[bsi.sizeIndex()]; 456 return offset; 457 } 458 459 private Bucket grabGlobalCompletelyFreeBucket() { 460 for (BucketSizeInfo bsi : bucketSizeInfos) { 461 Bucket b = bsi.findAndRemoveCompletelyFreeBucket(); 462 if (b != null) return b; 463 } 464 return null; 465 } 466 467 /** 468 * Free a block with the offset 469 * @param offset block's offset 470 * @return size freed 471 */ 472 public synchronized int freeBlock(long offset) { 473 int bucketNo = (int) (offset / bucketCapacity); 474 assert bucketNo >= 0 && bucketNo < buckets.length; 475 Bucket targetBucket = buckets[bucketNo]; 476 bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset); 477 usedSize -= targetBucket.getItemAllocationSize(); 478 return targetBucket.getItemAllocationSize(); 479 } 480 481 public int sizeIndexOfAllocation(long offset) { 482 int bucketNo = (int) (offset / bucketCapacity); 483 assert bucketNo >= 0 && bucketNo < buckets.length; 484 Bucket targetBucket = buckets[bucketNo]; 485 return targetBucket.sizeIndex(); 486 } 487 488 public int sizeOfAllocation(long offset) { 489 int bucketNo = (int) (offset / bucketCapacity); 490 assert bucketNo >= 0 && bucketNo < buckets.length; 491 Bucket targetBucket = buckets[bucketNo]; 492 return targetBucket.getItemAllocationSize(); 493 } 494 495 static class IndexStatistics { 496 private long freeCount, usedCount, itemSize, totalCount; 497 498 public long freeCount() { 499 return freeCount; 500 } 501 502 public long usedCount() { 503 return usedCount; 504 } 505 506 public long totalCount() { 507 return totalCount; 508 } 509 510 public long freeBytes() { 511 return freeCount * itemSize; 512 } 513 514 public long usedBytes() { 515 return usedCount * itemSize; 516 } 517 518 public long totalBytes() { 519 return totalCount * itemSize; 520 } 521 522 public long itemSize() { 523 return itemSize; 524 } 525 526 public IndexStatistics(long free, long used, long itemSize) { 527 setTo(free, used, itemSize); 528 } 529 530 public IndexStatistics() { 531 setTo(-1, -1, 0); 532 } 533 534 public void setTo(long free, long used, long itemSize) { 535 this.itemSize = itemSize; 536 this.freeCount = free; 537 this.usedCount = used; 538 this.totalCount = free + used; 539 } 540 } 541 542 public Bucket [] getBuckets() { 543 return this.buckets; 544 } 545 546 void logStatistics() { 547 IndexStatistics total = new IndexStatistics(); 548 IndexStatistics[] stats = getIndexStatistics(total); 549 LOG.info("Bucket allocator statistics follow:\n"); 550 LOG.info(" Free bytes=" + total.freeBytes() + "+; used bytes=" 551 + total.usedBytes() + "; total bytes=" + total.totalBytes()); 552 for (IndexStatistics s : stats) { 553 LOG.info(" Object size " + s.itemSize() + " used=" + s.usedCount() 554 + "; free=" + s.freeCount() + "; total=" + s.totalCount()); 555 } 556 } 557 558 IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) { 559 IndexStatistics[] stats = getIndexStatistics(); 560 long totalfree = 0, totalused = 0; 561 for (IndexStatistics stat : stats) { 562 totalfree += stat.freeBytes(); 563 totalused += stat.usedBytes(); 564 } 565 grandTotal.setTo(totalfree, totalused, 1); 566 return stats; 567 } 568 569 IndexStatistics[] getIndexStatistics() { 570 IndexStatistics[] stats = new IndexStatistics[bucketSizes.length]; 571 for (int i = 0; i < stats.length; ++i) 572 stats[i] = bucketSizeInfos[i].statistics(); 573 return stats; 574 } 575 576 public long freeBlock(long freeList[]) { 577 long sz = 0; 578 for (int i = 0; i < freeList.length; ++i) 579 sz += freeBlock(freeList[i]); 580 return sz; 581 } 582 583 public int getBucketIndex(long offset) { 584 return (int) (offset / bucketCapacity); 585 } 586 587 /** 588 * Returns a set of indices of the buckets that are least filled 589 * excluding the offsets, we also the fully free buckets for the 590 * BucketSizes where everything is empty and they only have one 591 * completely free bucket as a reserved 592 * 593 * @param excludedBuckets the buckets that need to be excluded due to 594 * currently being in used 595 * @param bucketCount max Number of buckets to return 596 * @return set of bucket indices which could be used for eviction 597 */ 598 public Set<Integer> getLeastFilledBuckets(Set<Integer> excludedBuckets, 599 int bucketCount) { 600 Queue<Integer> queue = MinMaxPriorityQueue.<Integer>orderedBy( 601 new Comparator<Integer>() { 602 @Override 603 public int compare(Integer left, Integer right) { 604 // We will always get instantiated buckets 605 return Float.compare( 606 ((float) buckets[left].usedCount) / buckets[left].itemCount, 607 ((float) buckets[right].usedCount) / buckets[right].itemCount); 608 } 609 }).maximumSize(bucketCount).create(); 610 611 for (int i = 0; i < buckets.length; i ++ ) { 612 if (!excludedBuckets.contains(i) && !buckets[i].isUninstantiated() && 613 // Avoid the buckets that are the only buckets for a sizeIndex 614 bucketSizeInfos[buckets[i].sizeIndex()].bucketList.size() != 1) { 615 queue.add(i); 616 } 617 } 618 619 Set<Integer> result = new HashSet<>(bucketCount); 620 result.addAll(queue); 621 622 return result; 623 } 624}