001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.io.hfile.bucket; 020 021import java.util.Arrays; 022import java.util.Comparator; 023import java.util.HashSet; 024import java.util.Iterator; 025import java.util.Map; 026import java.util.Queue; 027import java.util.Set; 028import java.util.concurrent.atomic.LongAdder; 029import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 030import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 036import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 037import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue; 038import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 039import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.LinkedMap; 040 041/** 042 * This class is used to allocate a block with specified size and free the block when evicting. It 043 * manages an array of buckets, each bucket is associated with a size and caches elements up to this 044 * size. For a completely empty bucket, this size could be re-specified dynamically. 045 * <p/> 046 * This class is not thread safe. 047 */ 048@InterfaceAudience.Private 049public final class BucketAllocator { 050 private static final Logger LOG = LoggerFactory.getLogger(BucketAllocator.class); 051 052 public final static class Bucket { 053 private long baseOffset; 054 private int itemAllocationSize, sizeIndex; 055 private int itemCount; 056 private int freeList[]; 057 private int freeCount, usedCount; 058 059 public Bucket(long offset) { 060 baseOffset = offset; 061 sizeIndex = -1; 062 } 063 064 void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) { 065 Preconditions.checkElementIndex(sizeIndex, bucketSizes.length); 066 this.sizeIndex = sizeIndex; 067 itemAllocationSize = bucketSizes[sizeIndex]; 068 itemCount = (int) (bucketCapacity / (long) itemAllocationSize); 069 freeCount = itemCount; 070 usedCount = 0; 071 freeList = new int[itemCount]; 072 for (int i = 0; i < freeCount; ++i) 073 freeList[i] = i; 074 } 075 076 public boolean isUninstantiated() { 077 return sizeIndex == -1; 078 } 079 080 public int sizeIndex() { 081 return sizeIndex; 082 } 083 084 public int getItemAllocationSize() { 085 return itemAllocationSize; 086 } 087 088 public boolean hasFreeSpace() { 089 return freeCount > 0; 090 } 091 092 public boolean isCompletelyFree() { 093 return usedCount == 0; 094 } 095 096 public int freeCount() { 097 return freeCount; 098 } 099 100 public int usedCount() { 101 return usedCount; 102 } 103 104 public int getFreeBytes() { 105 return freeCount * itemAllocationSize; 106 } 107 108 public int getUsedBytes() { 109 return usedCount * itemAllocationSize; 110 } 111 112 public long getBaseOffset() { 113 return baseOffset; 114 } 115 116 /** 117 * Allocate a block in this bucket, return the offset representing the 118 * position in physical space 119 * @return the offset in the IOEngine 120 */ 121 public long allocate() { 122 assert freeCount > 0; // Else should not have been called 123 assert sizeIndex != -1; 124 ++usedCount; 125 long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize); 126 assert offset >= 0; 127 return offset; 128 } 129 130 public void addAllocation(long offset) throws BucketAllocatorException { 131 offset -= baseOffset; 132 if (offset < 0 || offset % itemAllocationSize != 0) 133 throw new BucketAllocatorException( 134 "Attempt to add allocation for bad offset: " + offset + " base=" 135 + baseOffset + ", bucket size=" + itemAllocationSize); 136 int idx = (int) (offset / itemAllocationSize); 137 boolean matchFound = false; 138 for (int i = 0; i < freeCount; ++i) { 139 if (matchFound) freeList[i - 1] = freeList[i]; 140 else if (freeList[i] == idx) matchFound = true; 141 } 142 if (!matchFound) 143 throw new BucketAllocatorException("Couldn't find match for index " 144 + idx + " in free list"); 145 ++usedCount; 146 --freeCount; 147 } 148 149 private void free(long offset) { 150 offset -= baseOffset; 151 assert offset >= 0; 152 assert offset < itemCount * itemAllocationSize; 153 assert offset % itemAllocationSize == 0; 154 assert usedCount > 0; 155 assert freeCount < itemCount; // Else duplicate free 156 int item = (int) (offset / (long) itemAllocationSize); 157 assert !freeListContains(item); 158 --usedCount; 159 freeList[freeCount++] = item; 160 } 161 162 private boolean freeListContains(int blockNo) { 163 for (int i = 0; i < freeCount; ++i) { 164 if (freeList[i] == blockNo) return true; 165 } 166 return false; 167 } 168 } 169 170 final class BucketSizeInfo { 171 // Free bucket means it has space to allocate a block; 172 // Completely free bucket means it has no block. 173 private LinkedMap bucketList, freeBuckets, completelyFreeBuckets; 174 private int sizeIndex; 175 176 BucketSizeInfo(int sizeIndex) { 177 bucketList = new LinkedMap(); 178 freeBuckets = new LinkedMap(); 179 completelyFreeBuckets = new LinkedMap(); 180 this.sizeIndex = sizeIndex; 181 } 182 183 public synchronized void instantiateBucket(Bucket b) { 184 assert b.isUninstantiated() || b.isCompletelyFree(); 185 b.reconfigure(sizeIndex, bucketSizes, bucketCapacity); 186 bucketList.put(b, b); 187 freeBuckets.put(b, b); 188 completelyFreeBuckets.put(b, b); 189 } 190 191 public int sizeIndex() { 192 return sizeIndex; 193 } 194 195 /** 196 * Find a bucket to allocate a block 197 * @return the offset in the IOEngine 198 */ 199 public long allocateBlock() { 200 Bucket b = null; 201 if (freeBuckets.size() > 0) { 202 // Use up an existing one first... 203 b = (Bucket) freeBuckets.lastKey(); 204 } 205 if (b == null) { 206 b = grabGlobalCompletelyFreeBucket(); 207 if (b != null) instantiateBucket(b); 208 } 209 if (b == null) return -1; 210 long result = b.allocate(); 211 blockAllocated(b); 212 return result; 213 } 214 215 void blockAllocated(Bucket b) { 216 if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b); 217 if (!b.hasFreeSpace()) freeBuckets.remove(b); 218 } 219 220 public Bucket findAndRemoveCompletelyFreeBucket() { 221 Bucket b = null; 222 assert bucketList.size() > 0; 223 if (bucketList.size() == 1) { 224 // So we never get complete starvation of a bucket for a size 225 return null; 226 } 227 228 if (completelyFreeBuckets.size() > 0) { 229 b = (Bucket) completelyFreeBuckets.firstKey(); 230 removeBucket(b); 231 } 232 return b; 233 } 234 235 private synchronized void removeBucket(Bucket b) { 236 assert b.isCompletelyFree(); 237 bucketList.remove(b); 238 freeBuckets.remove(b); 239 completelyFreeBuckets.remove(b); 240 } 241 242 public void freeBlock(Bucket b, long offset) { 243 assert bucketList.containsKey(b); 244 // else we shouldn't have anything to free... 245 assert (!completelyFreeBuckets.containsKey(b)); 246 b.free(offset); 247 if (!freeBuckets.containsKey(b)) freeBuckets.put(b, b); 248 if (b.isCompletelyFree()) completelyFreeBuckets.put(b, b); 249 } 250 251 public synchronized IndexStatistics statistics() { 252 long free = 0, used = 0; 253 for (Object obj : bucketList.keySet()) { 254 Bucket b = (Bucket) obj; 255 free += b.freeCount(); 256 used += b.usedCount(); 257 } 258 return new IndexStatistics(free, used, bucketSizes[sizeIndex]); 259 } 260 261 @Override 262 public String toString() { 263 return MoreObjects.toStringHelper(this.getClass()) 264 .add("sizeIndex", sizeIndex) 265 .add("bucketSize", bucketSizes[sizeIndex]) 266 .toString(); 267 } 268 } 269 270 // Default block size in hbase is 64K, so we choose more sizes near 64K, you'd better 271 // reset it according to your cluster's block size distribution 272 // The real block size in hfile maybe a little larger than the size we configured , 273 // so we need add extra 1024 bytes for fit. 274 // TODO Support the view of block size distribution statistics 275 private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024, 276 16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024, 277 56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024, 278 192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024, 279 512 * 1024 + 1024 }; 280 281 /** 282 * Round up the given block size to bucket size, and get the corresponding 283 * BucketSizeInfo 284 */ 285 public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) { 286 for (int i = 0; i < bucketSizes.length; ++i) 287 if (blockSize <= bucketSizes[i]) 288 return bucketSizeInfos[i]; 289 return null; 290 } 291 292 /** 293 * So, what is the minimum amount of items we'll tolerate in a single bucket? 294 */ 295 static public final int FEWEST_ITEMS_IN_BUCKET = 4; 296 297 private final int[] bucketSizes; 298 private final int bigItemSize; 299 // The capacity size for each bucket 300 private final long bucketCapacity; 301 private Bucket[] buckets; 302 private BucketSizeInfo[] bucketSizeInfos; 303 private final long totalSize; 304 private transient long usedSize = 0; 305 306 BucketAllocator(long availableSpace, int[] bucketSizes) 307 throws BucketAllocatorException { 308 this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes; 309 Arrays.sort(this.bucketSizes); 310 this.bigItemSize = Ints.max(this.bucketSizes); 311 this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * (long) bigItemSize; 312 buckets = new Bucket[(int) (availableSpace / bucketCapacity)]; 313 if (buckets.length < this.bucketSizes.length) 314 throw new BucketAllocatorException("Bucket allocator size too small (" + buckets.length + 315 "); must have room for at least " + this.bucketSizes.length + " buckets"); 316 bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length]; 317 for (int i = 0; i < this.bucketSizes.length; ++i) { 318 bucketSizeInfos[i] = new BucketSizeInfo(i); 319 } 320 for (int i = 0; i < buckets.length; ++i) { 321 buckets[i] = new Bucket(bucketCapacity * i); 322 bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1] 323 .instantiateBucket(buckets[i]); 324 } 325 this.totalSize = ((long) buckets.length) * bucketCapacity; 326 if (LOG.isInfoEnabled()) { 327 LOG.info("Cache totalSize=" + this.totalSize + ", buckets=" + this.buckets.length + 328 ", bucket capacity=" + this.bucketCapacity + 329 "=(" + FEWEST_ITEMS_IN_BUCKET + "*" + this.bigItemSize + ")=" + 330 "(FEWEST_ITEMS_IN_BUCKET*(largest configured bucketcache size))"); 331 } 332 } 333 334 /** 335 * Rebuild the allocator's data structures from a persisted map. 336 * @param availableSpace capacity of cache 337 * @param map A map stores the block key and BucketEntry(block's meta data 338 * like offset, length) 339 * @param realCacheSize cached data size statistics for bucket cache 340 * @throws BucketAllocatorException 341 */ 342 BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map, 343 LongAdder realCacheSize) throws BucketAllocatorException { 344 this(availableSpace, bucketSizes); 345 346 // each bucket has an offset, sizeindex. probably the buckets are too big 347 // in our default state. so what we do is reconfigure them according to what 348 // we've found. we can only reconfigure each bucket once; if more than once, 349 // we know there's a bug, so we just log the info, throw, and start again... 350 boolean[] reconfigured = new boolean[buckets.length]; 351 int sizeNotMatchedCount = 0; 352 int insufficientCapacityCount = 0; 353 Iterator<Map.Entry<BlockCacheKey, BucketEntry>> iterator = map.entrySet().iterator(); 354 while (iterator.hasNext()) { 355 Map.Entry<BlockCacheKey, BucketEntry> entry = iterator.next(); 356 long foundOffset = entry.getValue().offset(); 357 int foundLen = entry.getValue().getLength(); 358 int bucketSizeIndex = -1; 359 for (int i = 0; i < this.bucketSizes.length; ++i) { 360 if (foundLen <= this.bucketSizes[i]) { 361 bucketSizeIndex = i; 362 break; 363 } 364 } 365 if (bucketSizeIndex == -1) { 366 sizeNotMatchedCount++; 367 iterator.remove(); 368 continue; 369 } 370 int bucketNo = (int) (foundOffset / bucketCapacity); 371 if (bucketNo < 0 || bucketNo >= buckets.length) { 372 insufficientCapacityCount++; 373 iterator.remove(); 374 continue; 375 } 376 Bucket b = buckets[bucketNo]; 377 if (reconfigured[bucketNo]) { 378 if (b.sizeIndex() != bucketSizeIndex) { 379 throw new BucketAllocatorException("Inconsistent allocation in bucket map;"); 380 } 381 } else { 382 if (!b.isCompletelyFree()) { 383 throw new BucketAllocatorException( 384 "Reconfiguring bucket " + bucketNo + " but it's already allocated; corrupt data"); 385 } 386 // Need to remove the bucket from whichever list it's currently in at 387 // the moment... 388 BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex]; 389 BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()]; 390 oldbsi.removeBucket(b); 391 bsi.instantiateBucket(b); 392 reconfigured[bucketNo] = true; 393 } 394 realCacheSize.add(foundLen); 395 buckets[bucketNo].addAllocation(foundOffset); 396 usedSize += buckets[bucketNo].getItemAllocationSize(); 397 bucketSizeInfos[bucketSizeIndex].blockAllocated(b); 398 } 399 400 if (sizeNotMatchedCount > 0) { 401 LOG.warn("There are " + sizeNotMatchedCount + " blocks which can't be rebuilt because " + 402 "there is no matching bucket size for these blocks"); 403 } 404 if (insufficientCapacityCount > 0) { 405 LOG.warn("There are " + insufficientCapacityCount + " blocks which can't be rebuilt - " 406 + "did you shrink the cache?"); 407 } 408 } 409 410 @Override 411 public String toString() { 412 StringBuilder sb = new StringBuilder(1024); 413 for (int i = 0; i < buckets.length; ++i) { 414 Bucket b = buckets[i]; 415 if (i > 0) sb.append(", "); 416 sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize()); 417 sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount()); 418 } 419 return sb.toString(); 420 } 421 422 public long getUsedSize() { 423 return this.usedSize; 424 } 425 426 public long getFreeSize() { 427 return this.totalSize - getUsedSize(); 428 } 429 430 public long getTotalSize() { 431 return this.totalSize; 432 } 433 434 /** 435 * Allocate a block with specified size. Return the offset 436 * @param blockSize size of block 437 * @throws BucketAllocatorException 438 * @throws CacheFullException 439 * @return the offset in the IOEngine 440 */ 441 public synchronized long allocateBlock(int blockSize) throws CacheFullException, 442 BucketAllocatorException { 443 assert blockSize > 0; 444 BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize); 445 if (bsi == null) { 446 throw new BucketAllocatorException("Allocation too big size=" + blockSize + 447 "; adjust BucketCache sizes " + BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY + 448 " to accomodate if size seems reasonable and you want it cached."); 449 } 450 long offset = bsi.allocateBlock(); 451 452 // Ask caller to free up space and try again! 453 if (offset < 0) 454 throw new CacheFullException(blockSize, bsi.sizeIndex()); 455 usedSize += bucketSizes[bsi.sizeIndex()]; 456 return offset; 457 } 458 459 private Bucket grabGlobalCompletelyFreeBucket() { 460 for (BucketSizeInfo bsi : bucketSizeInfos) { 461 Bucket b = bsi.findAndRemoveCompletelyFreeBucket(); 462 if (b != null) return b; 463 } 464 return null; 465 } 466 467 /** 468 * Free a block with the offset 469 * @param offset block's offset 470 * @return size freed 471 */ 472 public synchronized int freeBlock(long offset) { 473 int bucketNo = (int) (offset / bucketCapacity); 474 assert bucketNo >= 0 && bucketNo < buckets.length; 475 Bucket targetBucket = buckets[bucketNo]; 476 bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset); 477 usedSize -= targetBucket.getItemAllocationSize(); 478 return targetBucket.getItemAllocationSize(); 479 } 480 481 public int sizeIndexOfAllocation(long offset) { 482 int bucketNo = (int) (offset / bucketCapacity); 483 assert bucketNo >= 0 && bucketNo < buckets.length; 484 Bucket targetBucket = buckets[bucketNo]; 485 return targetBucket.sizeIndex(); 486 } 487 488 public int sizeOfAllocation(long offset) { 489 int bucketNo = (int) (offset / bucketCapacity); 490 assert bucketNo >= 0 && bucketNo < buckets.length; 491 Bucket targetBucket = buckets[bucketNo]; 492 return targetBucket.getItemAllocationSize(); 493 } 494 495 static class IndexStatistics { 496 private long freeCount, usedCount, itemSize, totalCount; 497 498 public long freeCount() { 499 return freeCount; 500 } 501 502 public long usedCount() { 503 return usedCount; 504 } 505 506 public long totalCount() { 507 return totalCount; 508 } 509 510 public long freeBytes() { 511 return freeCount * itemSize; 512 } 513 514 public long usedBytes() { 515 return usedCount * itemSize; 516 } 517 518 public long totalBytes() { 519 return totalCount * itemSize; 520 } 521 522 public long itemSize() { 523 return itemSize; 524 } 525 526 public IndexStatistics(long free, long used, long itemSize) { 527 setTo(free, used, itemSize); 528 } 529 530 public IndexStatistics() { 531 setTo(-1, -1, 0); 532 } 533 534 public void setTo(long free, long used, long itemSize) { 535 this.itemSize = itemSize; 536 this.freeCount = free; 537 this.usedCount = used; 538 this.totalCount = free + used; 539 } 540 } 541 542 public Bucket [] getBuckets() { 543 return this.buckets; 544 } 545 546 void logStatistics() { 547 IndexStatistics total = new IndexStatistics(); 548 IndexStatistics[] stats = getIndexStatistics(total); 549 LOG.info("Bucket allocator statistics follow:\n"); 550 LOG.info(" Free bytes=" + total.freeBytes() + "+; used bytes=" 551 + total.usedBytes() + "; total bytes=" + total.totalBytes()); 552 for (IndexStatistics s : stats) { 553 LOG.info(" Object size " + s.itemSize() + " used=" + s.usedCount() 554 + "; free=" + s.freeCount() + "; total=" + s.totalCount()); 555 } 556 } 557 558 IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) { 559 IndexStatistics[] stats = getIndexStatistics(); 560 long totalfree = 0, totalused = 0; 561 for (IndexStatistics stat : stats) { 562 totalfree += stat.freeBytes(); 563 totalused += stat.usedBytes(); 564 } 565 grandTotal.setTo(totalfree, totalused, 1); 566 return stats; 567 } 568 569 IndexStatistics[] getIndexStatistics() { 570 IndexStatistics[] stats = new IndexStatistics[bucketSizes.length]; 571 for (int i = 0; i < stats.length; ++i) 572 stats[i] = bucketSizeInfos[i].statistics(); 573 return stats; 574 } 575 576 public long freeBlock(long freeList[]) { 577 long sz = 0; 578 for (int i = 0; i < freeList.length; ++i) 579 sz += freeBlock(freeList[i]); 580 return sz; 581 } 582 583 public int getBucketIndex(long offset) { 584 return (int) (offset / bucketCapacity); 585 } 586 587 /** 588 * Returns a set of indices of the buckets that are least filled 589 * excluding the offsets, we also the fully free buckets for the 590 * BucketSizes where everything is empty and they only have one 591 * completely free bucket as a reserved 592 * 593 * @param excludedBuckets the buckets that need to be excluded due to 594 * currently being in used 595 * @param bucketCount max Number of buckets to return 596 * @return set of bucket indices which could be used for eviction 597 */ 598 public Set<Integer> getLeastFilledBuckets(Set<Integer> excludedBuckets, 599 int bucketCount) { 600 Queue<Integer> queue = MinMaxPriorityQueue.<Integer>orderedBy( 601 new Comparator<Integer>() { 602 @Override 603 public int compare(Integer left, Integer right) { 604 // We will always get instantiated buckets 605 return Float.compare( 606 ((float) buckets[left].usedCount) / buckets[left].itemCount, 607 ((float) buckets[right].usedCount) / buckets[right].itemCount); 608 } 609 }).maximumSize(bucketCount).create(); 610 611 for (int i = 0; i < buckets.length; i ++ ) { 612 if (!excludedBuckets.contains(i) && !buckets[i].isUninstantiated() && 613 // Avoid the buckets that are the only buckets for a sizeIndex 614 bucketSizeInfos[buckets[i].sizeIndex()].bucketList.size() != 1) { 615 queue.add(i); 616 } 617 } 618 619 Set<Integer> result = new HashSet<>(bucketCount); 620 result.addAll(queue); 621 622 return result; 623 } 624}