001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package org.apache.hadoop.hbase.io.hfile.bucket; 022 023import java.util.Arrays; 024import java.util.Comparator; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.Map; 028import java.util.Queue; 029import java.util.Set; 030import java.util.concurrent.atomic.LongAdder; 031 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 036import org.apache.hadoop.hbase.io.hfile.CacheConfig; 037import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry; 038import com.fasterxml.jackson.annotation.JsonIgnoreProperties; 039 040import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 041import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 042import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue; 043import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 044import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.LinkedMap; 045 046/** 047 * This class is used to allocate a block with specified size and free the block 048 * when evicting. It manages an array of buckets, each bucket is associated with 049 * a size and caches elements up to this size. For a completely empty bucket, this 050 * size could be re-specified dynamically. 051 * 052 * This class is not thread safe. 053 */ 054@InterfaceAudience.Private 055@JsonIgnoreProperties({"indexStatistics", "freeSize", "usedSize"}) 056public final class BucketAllocator { 057 private static final Logger LOG = LoggerFactory.getLogger(BucketAllocator.class); 058 059 @JsonIgnoreProperties({"completelyFree", "uninstantiated"}) 060 public final static class Bucket { 061 private long baseOffset; 062 private int itemAllocationSize, sizeIndex; 063 private int itemCount; 064 private int freeList[]; 065 private int freeCount, usedCount; 066 067 public Bucket(long offset) { 068 baseOffset = offset; 069 sizeIndex = -1; 070 } 071 072 void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) { 073 Preconditions.checkElementIndex(sizeIndex, bucketSizes.length); 074 this.sizeIndex = sizeIndex; 075 itemAllocationSize = bucketSizes[sizeIndex]; 076 itemCount = (int) (bucketCapacity / (long) itemAllocationSize); 077 freeCount = itemCount; 078 usedCount = 0; 079 freeList = new int[itemCount]; 080 for (int i = 0; i < freeCount; ++i) 081 freeList[i] = i; 082 } 083 084 public boolean isUninstantiated() { 085 return sizeIndex == -1; 086 } 087 088 public int sizeIndex() { 089 return sizeIndex; 090 } 091 092 public int getItemAllocationSize() { 093 return itemAllocationSize; 094 } 095 096 public boolean hasFreeSpace() { 097 return freeCount > 0; 098 } 099 100 public boolean isCompletelyFree() { 101 return usedCount == 0; 102 } 103 104 public int freeCount() { 105 return freeCount; 106 } 107 108 public int usedCount() { 109 return usedCount; 110 } 111 112 public int getFreeBytes() { 113 return freeCount * itemAllocationSize; 114 } 115 116 public int getUsedBytes() { 117 return usedCount * itemAllocationSize; 118 } 119 120 public long getBaseOffset() { 121 return baseOffset; 122 } 123 124 /** 125 * Allocate a block in this bucket, return the offset representing the 126 * position in physical space 127 * @return the offset in the IOEngine 128 */ 129 public long allocate() { 130 assert freeCount > 0; // Else should not have been called 131 assert sizeIndex != -1; 132 ++usedCount; 133 long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize); 134 assert offset >= 0; 135 return offset; 136 } 137 138 public void addAllocation(long offset) throws BucketAllocatorException { 139 offset -= baseOffset; 140 if (offset < 0 || offset % itemAllocationSize != 0) 141 throw new BucketAllocatorException( 142 "Attempt to add allocation for bad offset: " + offset + " base=" 143 + baseOffset + ", bucket size=" + itemAllocationSize); 144 int idx = (int) (offset / itemAllocationSize); 145 boolean matchFound = false; 146 for (int i = 0; i < freeCount; ++i) { 147 if (matchFound) freeList[i - 1] = freeList[i]; 148 else if (freeList[i] == idx) matchFound = true; 149 } 150 if (!matchFound) 151 throw new BucketAllocatorException("Couldn't find match for index " 152 + idx + " in free list"); 153 ++usedCount; 154 --freeCount; 155 } 156 157 private void free(long offset) { 158 offset -= baseOffset; 159 assert offset >= 0; 160 assert offset < itemCount * itemAllocationSize; 161 assert offset % itemAllocationSize == 0; 162 assert usedCount > 0; 163 assert freeCount < itemCount; // Else duplicate free 164 int item = (int) (offset / (long) itemAllocationSize); 165 assert !freeListContains(item); 166 --usedCount; 167 freeList[freeCount++] = item; 168 } 169 170 private boolean freeListContains(int blockNo) { 171 for (int i = 0; i < freeCount; ++i) { 172 if (freeList[i] == blockNo) return true; 173 } 174 return false; 175 } 176 } 177 178 final class BucketSizeInfo { 179 // Free bucket means it has space to allocate a block; 180 // Completely free bucket means it has no block. 181 private LinkedMap bucketList, freeBuckets, completelyFreeBuckets; 182 private int sizeIndex; 183 184 BucketSizeInfo(int sizeIndex) { 185 bucketList = new LinkedMap(); 186 freeBuckets = new LinkedMap(); 187 completelyFreeBuckets = new LinkedMap(); 188 this.sizeIndex = sizeIndex; 189 } 190 191 public synchronized void instantiateBucket(Bucket b) { 192 assert b.isUninstantiated() || b.isCompletelyFree(); 193 b.reconfigure(sizeIndex, bucketSizes, bucketCapacity); 194 bucketList.put(b, b); 195 freeBuckets.put(b, b); 196 completelyFreeBuckets.put(b, b); 197 } 198 199 public int sizeIndex() { 200 return sizeIndex; 201 } 202 203 /** 204 * Find a bucket to allocate a block 205 * @return the offset in the IOEngine 206 */ 207 public long allocateBlock() { 208 Bucket b = null; 209 if (freeBuckets.size() > 0) { 210 // Use up an existing one first... 211 b = (Bucket) freeBuckets.lastKey(); 212 } 213 if (b == null) { 214 b = grabGlobalCompletelyFreeBucket(); 215 if (b != null) instantiateBucket(b); 216 } 217 if (b == null) return -1; 218 long result = b.allocate(); 219 blockAllocated(b); 220 return result; 221 } 222 223 void blockAllocated(Bucket b) { 224 if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b); 225 if (!b.hasFreeSpace()) freeBuckets.remove(b); 226 } 227 228 public Bucket findAndRemoveCompletelyFreeBucket() { 229 Bucket b = null; 230 assert bucketList.size() > 0; 231 if (bucketList.size() == 1) { 232 // So we never get complete starvation of a bucket for a size 233 return null; 234 } 235 236 if (completelyFreeBuckets.size() > 0) { 237 b = (Bucket) completelyFreeBuckets.firstKey(); 238 removeBucket(b); 239 } 240 return b; 241 } 242 243 private synchronized void removeBucket(Bucket b) { 244 assert b.isCompletelyFree(); 245 bucketList.remove(b); 246 freeBuckets.remove(b); 247 completelyFreeBuckets.remove(b); 248 } 249 250 public void freeBlock(Bucket b, long offset) { 251 assert bucketList.containsKey(b); 252 // else we shouldn't have anything to free... 253 assert (!completelyFreeBuckets.containsKey(b)); 254 b.free(offset); 255 if (!freeBuckets.containsKey(b)) freeBuckets.put(b, b); 256 if (b.isCompletelyFree()) completelyFreeBuckets.put(b, b); 257 } 258 259 public synchronized IndexStatistics statistics() { 260 long free = 0, used = 0; 261 for (Object obj : bucketList.keySet()) { 262 Bucket b = (Bucket) obj; 263 free += b.freeCount(); 264 used += b.usedCount(); 265 } 266 return new IndexStatistics(free, used, bucketSizes[sizeIndex]); 267 } 268 269 @Override 270 public String toString() { 271 return MoreObjects.toStringHelper(this.getClass()) 272 .add("sizeIndex", sizeIndex) 273 .add("bucketSize", bucketSizes[sizeIndex]) 274 .toString(); 275 } 276 } 277 278 // Default block size in hbase is 64K, so we choose more sizes near 64K, you'd better 279 // reset it according to your cluster's block size distribution 280 // TODO Support the view of block size distribution statistics 281 // TODO: Why we add the extra 1024 bytes? Slop? 282 private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024, 283 16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024, 284 56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024, 285 192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024, 286 512 * 1024 + 1024 }; 287 288 /** 289 * Round up the given block size to bucket size, and get the corresponding 290 * BucketSizeInfo 291 */ 292 public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) { 293 for (int i = 0; i < bucketSizes.length; ++i) 294 if (blockSize <= bucketSizes[i]) 295 return bucketSizeInfos[i]; 296 return null; 297 } 298 299 /** 300 * So, what is the minimum amount of items we'll tolerate in a single bucket? 301 */ 302 static public final int FEWEST_ITEMS_IN_BUCKET = 4; 303 304 private final int[] bucketSizes; 305 private final int bigItemSize; 306 // The capacity size for each bucket 307 private final long bucketCapacity; 308 private Bucket[] buckets; 309 private BucketSizeInfo[] bucketSizeInfos; 310 private final long totalSize; 311 private long usedSize = 0; 312 313 BucketAllocator(long availableSpace, int[] bucketSizes) 314 throws BucketAllocatorException { 315 this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes; 316 Arrays.sort(this.bucketSizes); 317 this.bigItemSize = Ints.max(this.bucketSizes); 318 this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * (long) bigItemSize; 319 buckets = new Bucket[(int) (availableSpace / bucketCapacity)]; 320 if (buckets.length < this.bucketSizes.length) 321 throw new BucketAllocatorException("Bucket allocator size too small (" + buckets.length + 322 "); must have room for at least " + this.bucketSizes.length + " buckets"); 323 bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length]; 324 for (int i = 0; i < this.bucketSizes.length; ++i) { 325 bucketSizeInfos[i] = new BucketSizeInfo(i); 326 } 327 for (int i = 0; i < buckets.length; ++i) { 328 buckets[i] = new Bucket(bucketCapacity * i); 329 bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1] 330 .instantiateBucket(buckets[i]); 331 } 332 this.totalSize = ((long) buckets.length) * bucketCapacity; 333 if (LOG.isInfoEnabled()) { 334 LOG.info("Cache totalSize=" + this.totalSize + ", buckets=" + this.buckets.length + 335 ", bucket capacity=" + this.bucketCapacity + 336 "=(" + FEWEST_ITEMS_IN_BUCKET + "*" + this.bigItemSize + ")=" + 337 "(FEWEST_ITEMS_IN_BUCKET*(largest configured bucketcache size))"); 338 } 339 } 340 341 /** 342 * Rebuild the allocator's data structures from a persisted map. 343 * @param availableSpace capacity of cache 344 * @param map A map stores the block key and BucketEntry(block's meta data 345 * like offset, length) 346 * @param realCacheSize cached data size statistics for bucket cache 347 * @throws BucketAllocatorException 348 */ 349 BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map, 350 LongAdder realCacheSize) throws BucketAllocatorException { 351 this(availableSpace, bucketSizes); 352 353 // each bucket has an offset, sizeindex. probably the buckets are too big 354 // in our default state. so what we do is reconfigure them according to what 355 // we've found. we can only reconfigure each bucket once; if more than once, 356 // we know there's a bug, so we just log the info, throw, and start again... 357 boolean[] reconfigured = new boolean[buckets.length]; 358 int sizeNotMatchedCount = 0; 359 int insufficientCapacityCount = 0; 360 Iterator<Map.Entry<BlockCacheKey, BucketEntry>> iterator = map.entrySet().iterator(); 361 while (iterator.hasNext()) { 362 Map.Entry<BlockCacheKey, BucketEntry> entry = iterator.next(); 363 long foundOffset = entry.getValue().offset(); 364 int foundLen = entry.getValue().getLength(); 365 int bucketSizeIndex = -1; 366 for (int i = 0; i < this.bucketSizes.length; ++i) { 367 if (foundLen <= this.bucketSizes[i]) { 368 bucketSizeIndex = i; 369 break; 370 } 371 } 372 if (bucketSizeIndex == -1) { 373 sizeNotMatchedCount++; 374 iterator.remove(); 375 continue; 376 } 377 int bucketNo = (int) (foundOffset / bucketCapacity); 378 if (bucketNo < 0 || bucketNo >= buckets.length) { 379 insufficientCapacityCount++; 380 iterator.remove(); 381 continue; 382 } 383 Bucket b = buckets[bucketNo]; 384 if (reconfigured[bucketNo]) { 385 if (b.sizeIndex() != bucketSizeIndex) { 386 throw new BucketAllocatorException("Inconsistent allocation in bucket map;"); 387 } 388 } else { 389 if (!b.isCompletelyFree()) { 390 throw new BucketAllocatorException( 391 "Reconfiguring bucket " + bucketNo + " but it's already allocated; corrupt data"); 392 } 393 // Need to remove the bucket from whichever list it's currently in at 394 // the moment... 395 BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex]; 396 BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()]; 397 oldbsi.removeBucket(b); 398 bsi.instantiateBucket(b); 399 reconfigured[bucketNo] = true; 400 } 401 realCacheSize.add(foundLen); 402 buckets[bucketNo].addAllocation(foundOffset); 403 usedSize += buckets[bucketNo].getItemAllocationSize(); 404 bucketSizeInfos[bucketSizeIndex].blockAllocated(b); 405 } 406 407 if (sizeNotMatchedCount > 0) { 408 LOG.warn("There are " + sizeNotMatchedCount + " blocks which can't be rebuilt because " + 409 "there is no matching bucket size for these blocks"); 410 } 411 if (insufficientCapacityCount > 0) { 412 LOG.warn("There are " + insufficientCapacityCount + " blocks which can't be rebuilt - " 413 + "did you shrink the cache?"); 414 } 415 } 416 417 @Override 418 public String toString() { 419 StringBuilder sb = new StringBuilder(1024); 420 for (int i = 0; i < buckets.length; ++i) { 421 Bucket b = buckets[i]; 422 if (i > 0) sb.append(", "); 423 sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize()); 424 sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount()); 425 } 426 return sb.toString(); 427 } 428 429 public long getUsedSize() { 430 return this.usedSize; 431 } 432 433 public long getFreeSize() { 434 return this.totalSize - getUsedSize(); 435 } 436 437 public long getTotalSize() { 438 return this.totalSize; 439 } 440 441 /** 442 * Allocate a block with specified size. Return the offset 443 * @param blockSize size of block 444 * @throws BucketAllocatorException 445 * @throws CacheFullException 446 * @return the offset in the IOEngine 447 */ 448 public synchronized long allocateBlock(int blockSize) throws CacheFullException, 449 BucketAllocatorException { 450 assert blockSize > 0; 451 BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize); 452 if (bsi == null) { 453 throw new BucketAllocatorException("Allocation too big size=" + blockSize + 454 "; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY + 455 " to accomodate if size seems reasonable and you want it cached."); 456 } 457 long offset = bsi.allocateBlock(); 458 459 // Ask caller to free up space and try again! 460 if (offset < 0) 461 throw new CacheFullException(blockSize, bsi.sizeIndex()); 462 usedSize += bucketSizes[bsi.sizeIndex()]; 463 return offset; 464 } 465 466 private Bucket grabGlobalCompletelyFreeBucket() { 467 for (BucketSizeInfo bsi : bucketSizeInfos) { 468 Bucket b = bsi.findAndRemoveCompletelyFreeBucket(); 469 if (b != null) return b; 470 } 471 return null; 472 } 473 474 /** 475 * Free a block with the offset 476 * @param offset block's offset 477 * @return size freed 478 */ 479 public synchronized int freeBlock(long offset) { 480 int bucketNo = (int) (offset / bucketCapacity); 481 assert bucketNo >= 0 && bucketNo < buckets.length; 482 Bucket targetBucket = buckets[bucketNo]; 483 bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset); 484 usedSize -= targetBucket.getItemAllocationSize(); 485 return targetBucket.getItemAllocationSize(); 486 } 487 488 public int sizeIndexOfAllocation(long offset) { 489 int bucketNo = (int) (offset / bucketCapacity); 490 assert bucketNo >= 0 && bucketNo < buckets.length; 491 Bucket targetBucket = buckets[bucketNo]; 492 return targetBucket.sizeIndex(); 493 } 494 495 public int sizeOfAllocation(long offset) { 496 int bucketNo = (int) (offset / bucketCapacity); 497 assert bucketNo >= 0 && bucketNo < buckets.length; 498 Bucket targetBucket = buckets[bucketNo]; 499 return targetBucket.getItemAllocationSize(); 500 } 501 502 static class IndexStatistics { 503 private long freeCount, usedCount, itemSize, totalCount; 504 505 public long freeCount() { 506 return freeCount; 507 } 508 509 public long usedCount() { 510 return usedCount; 511 } 512 513 public long totalCount() { 514 return totalCount; 515 } 516 517 public long freeBytes() { 518 return freeCount * itemSize; 519 } 520 521 public long usedBytes() { 522 return usedCount * itemSize; 523 } 524 525 public long totalBytes() { 526 return totalCount * itemSize; 527 } 528 529 public long itemSize() { 530 return itemSize; 531 } 532 533 public IndexStatistics(long free, long used, long itemSize) { 534 setTo(free, used, itemSize); 535 } 536 537 public IndexStatistics() { 538 setTo(-1, -1, 0); 539 } 540 541 public void setTo(long free, long used, long itemSize) { 542 this.itemSize = itemSize; 543 this.freeCount = free; 544 this.usedCount = used; 545 this.totalCount = free + used; 546 } 547 } 548 549 public Bucket [] getBuckets() { 550 return this.buckets; 551 } 552 553 void logStatistics() { 554 IndexStatistics total = new IndexStatistics(); 555 IndexStatistics[] stats = getIndexStatistics(total); 556 LOG.info("Bucket allocator statistics follow:\n"); 557 LOG.info(" Free bytes=" + total.freeBytes() + "+; used bytes=" 558 + total.usedBytes() + "; total bytes=" + total.totalBytes()); 559 for (IndexStatistics s : stats) { 560 LOG.info(" Object size " + s.itemSize() + " used=" + s.usedCount() 561 + "; free=" + s.freeCount() + "; total=" + s.totalCount()); 562 } 563 } 564 565 IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) { 566 IndexStatistics[] stats = getIndexStatistics(); 567 long totalfree = 0, totalused = 0; 568 for (IndexStatistics stat : stats) { 569 totalfree += stat.freeBytes(); 570 totalused += stat.usedBytes(); 571 } 572 grandTotal.setTo(totalfree, totalused, 1); 573 return stats; 574 } 575 576 IndexStatistics[] getIndexStatistics() { 577 IndexStatistics[] stats = new IndexStatistics[bucketSizes.length]; 578 for (int i = 0; i < stats.length; ++i) 579 stats[i] = bucketSizeInfos[i].statistics(); 580 return stats; 581 } 582 583 public long freeBlock(long freeList[]) { 584 long sz = 0; 585 for (int i = 0; i < freeList.length; ++i) 586 sz += freeBlock(freeList[i]); 587 return sz; 588 } 589 590 public int getBucketIndex(long offset) { 591 return (int) (offset / bucketCapacity); 592 } 593 594 /** 595 * Returns a set of indices of the buckets that are least filled 596 * excluding the offsets, we also the fully free buckets for the 597 * BucketSizes where everything is empty and they only have one 598 * completely free bucket as a reserved 599 * 600 * @param excludedBuckets the buckets that need to be excluded due to 601 * currently being in used 602 * @param bucketCount max Number of buckets to return 603 * @return set of bucket indices which could be used for eviction 604 */ 605 public Set<Integer> getLeastFilledBuckets(Set<Integer> excludedBuckets, 606 int bucketCount) { 607 Queue<Integer> queue = MinMaxPriorityQueue.<Integer>orderedBy( 608 new Comparator<Integer>() { 609 @Override 610 public int compare(Integer left, Integer right) { 611 // We will always get instantiated buckets 612 return Float.compare( 613 ((float) buckets[left].usedCount) / buckets[left].itemCount, 614 ((float) buckets[right].usedCount) / buckets[right].itemCount); 615 } 616 }).maximumSize(bucketCount).create(); 617 618 for (int i = 0; i < buckets.length; i ++ ) { 619 if (!excludedBuckets.contains(i) && !buckets[i].isUninstantiated() && 620 // Avoid the buckets that are the only buckets for a sizeIndex 621 bucketSizeInfos[buckets[i].sizeIndex()].bucketList.size() != 1) { 622 queue.add(i); 623 } 624 } 625 626 Set<Integer> result = new HashSet<>(bucketCount); 627 result.addAll(queue); 628 629 return result; 630 } 631}