001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY; 021 022import java.io.File; 023import java.io.FileInputStream; 024import java.io.FileOutputStream; 025import java.io.IOException; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.Comparator; 029import java.util.HashSet; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Map; 033import java.util.NavigableSet; 034import java.util.PriorityQueue; 035import java.util.Set; 036import java.util.concurrent.ArrayBlockingQueue; 037import java.util.concurrent.BlockingQueue; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentMap; 040import java.util.concurrent.ConcurrentSkipListSet; 041import java.util.concurrent.Executors; 042import java.util.concurrent.ScheduledExecutorService; 043import java.util.concurrent.TimeUnit; 044import java.util.concurrent.atomic.AtomicBoolean; 045import java.util.concurrent.atomic.AtomicLong; 046import java.util.concurrent.atomic.LongAdder; 047import java.util.concurrent.locks.Lock; 048import java.util.concurrent.locks.ReentrantLock; 049import java.util.concurrent.locks.ReentrantReadWriteLock; 050import java.util.function.Consumer; 051import java.util.function.Function; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.TableName; 055import org.apache.hadoop.hbase.client.Admin; 056import org.apache.hadoop.hbase.io.ByteBuffAllocator; 057import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 058import org.apache.hadoop.hbase.io.HeapSize; 059import org.apache.hadoop.hbase.io.hfile.BlockCache; 060import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 061import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 062import org.apache.hadoop.hbase.io.hfile.BlockPriority; 063import org.apache.hadoop.hbase.io.hfile.BlockType; 064import org.apache.hadoop.hbase.io.hfile.CacheStats; 065import org.apache.hadoop.hbase.io.hfile.Cacheable; 066import org.apache.hadoop.hbase.io.hfile.CachedBlock; 067import org.apache.hadoop.hbase.io.hfile.HFileBlock; 068import org.apache.hadoop.hbase.io.hfile.HFileContext; 069import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; 070import org.apache.hadoop.hbase.nio.ByteBuff; 071import org.apache.hadoop.hbase.nio.RefCnt; 072import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 073import org.apache.hadoop.hbase.util.Bytes; 074import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 075import org.apache.hadoop.hbase.util.IdReadWriteLock; 076import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef; 077import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool; 078import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType; 079import org.apache.hadoop.util.StringUtils; 080import org.apache.yetus.audience.InterfaceAudience; 081import org.slf4j.Logger; 082import org.slf4j.LoggerFactory; 083 084import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 085import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 086 087import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; 088 089/** 090 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache 091 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket 092 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap 093 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files 094 * {@link FileIOEngine} to store/read the block data. 095 * <p> 096 * Eviction is via a similar algorithm as used in 097 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} 098 * <p> 099 * BucketCache can be used as mainly a block cache (see 100 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to 101 * decrease CMS GC and heap fragmentation. 102 * <p> 103 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to 104 * enlarge cache space via a victim cache. 105 */ 106@InterfaceAudience.Private 107public class BucketCache implements BlockCache, HeapSize { 108 private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class); 109 110 /** Priority buckets config */ 111 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; 112 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; 113 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; 114 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; 115 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; 116 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; 117 118 /** Use strong reference for offsetLock or not */ 119 private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref"; 120 private static final boolean STRONG_REF_DEFAULT = false; 121 122 /** Priority buckets */ 123 static final float DEFAULT_SINGLE_FACTOR = 0.25f; 124 static final float DEFAULT_MULTI_FACTOR = 0.50f; 125 static final float DEFAULT_MEMORY_FACTOR = 0.25f; 126 static final float DEFAULT_MIN_FACTOR = 0.85f; 127 128 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; 129 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 130 131 // Number of blocks to clear for each of the bucket size that is full 132 private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; 133 134 /** Statistics thread */ 135 private static final int statThreadPeriod = 5 * 60; 136 137 final static int DEFAULT_WRITER_THREADS = 3; 138 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; 139 140 // Store/read block data 141 transient final IOEngine ioEngine; 142 143 // Store the block in this map before writing it to cache 144 transient final RAMCache ramCache; 145 // In this map, store the block's meta data like offset, length 146 transient ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap; 147 148 /** 149 * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so 150 * that Bucket IO exceptions/errors don't bring down the HBase server. 151 */ 152 private volatile boolean cacheEnabled; 153 154 /** 155 * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other 156 * words, the work adding blocks to the BucketCache is divided up amongst the running 157 * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when 158 * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It 159 * then updates the ramCache and backingMap accordingly. 160 */ 161 transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); 162 transient final WriterThread[] writerThreads; 163 164 /** Volatile boolean to track if free space is in process or not */ 165 private volatile boolean freeInProgress = false; 166 private transient final Lock freeSpaceLock = new ReentrantLock(); 167 168 private final LongAdder realCacheSize = new LongAdder(); 169 private final LongAdder heapSize = new LongAdder(); 170 /** Current number of cached elements */ 171 private final LongAdder blockNumber = new LongAdder(); 172 173 /** Cache access count (sequential ID) */ 174 private final AtomicLong accessCount = new AtomicLong(); 175 176 private static final int DEFAULT_CACHE_WAIT_TIME = 50; 177 178 private final BucketCacheStats cacheStats = new BucketCacheStats(); 179 180 private final String persistencePath; 181 private final long cacheCapacity; 182 /** Approximate block size */ 183 private final long blockSize; 184 185 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ 186 private final int ioErrorsTolerationDuration; 187 // 1 min 188 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; 189 190 // Start time of first IO error when reading or writing IO Engine, it will be 191 // reset after a successful read/write. 192 private volatile long ioErrorStartTime = -1; 193 194 /** 195 * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of 196 * this is to avoid freeing the block which is being read. 197 * <p> 198 */ 199 transient final IdReadWriteLock<Long> offsetLock; 200 201 private final NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>((a, b) -> { 202 int nameComparison = a.getHfileName().compareTo(b.getHfileName()); 203 if (nameComparison != 0) { 204 return nameComparison; 205 } 206 return Long.compare(a.getOffset(), b.getOffset()); 207 }); 208 209 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 210 private transient final ScheduledExecutorService scheduleThreadPool = 211 Executors.newScheduledThreadPool(1, 212 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); 213 214 // Allocate or free space for the block 215 private transient BucketAllocator bucketAllocator; 216 217 /** Acceptable size of cache (no evictions if size < acceptable) */ 218 private float acceptableFactor; 219 220 /** Minimum threshold of cache (when evicting, evict until size < min) */ 221 private float minFactor; 222 223 /** 224 * Free this floating point factor of extra blocks when evicting. For example free the number of 225 * blocks requested * (1 + extraFreeFactor) 226 */ 227 private float extraFreeFactor; 228 229 /** Single access bucket size */ 230 private float singleFactor; 231 232 /** Multiple access bucket size */ 233 private float multiFactor; 234 235 /** In-memory bucket size */ 236 private float memoryFactor; 237 238 private String prefetchedFileListPath; 239 240 private static final String FILE_VERIFY_ALGORITHM = 241 "hbase.bucketcache.persistent.file.integrity.check.algorithm"; 242 private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; 243 244 private static final String QUEUE_ADDITION_WAIT_TIME = 245 "hbase.bucketcache.queue.addition.waittime"; 246 private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0; 247 private long queueAdditionWaitTime; 248 /** 249 * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file 250 * integrity, default algorithm is MD5 251 */ 252 private String algorithm; 253 254 /* Tracing failed Bucket Cache allocations. */ 255 private long allocFailLogPrevTs; // time of previous log event for allocation failure. 256 private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute. 257 258 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 259 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 260 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 261 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); 262 } 263 264 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 265 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 266 Configuration conf) throws IOException { 267 boolean useStrongRef = conf.getBoolean(STRONG_REF_KEY, STRONG_REF_DEFAULT); 268 if (useStrongRef) { 269 this.offsetLock = new IdReadWriteLockStrongRef<>(); 270 } else { 271 this.offsetLock = new IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT); 272 } 273 this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); 274 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); 275 this.writerThreads = new WriterThread[writerThreadNum]; 276 long blockNumCapacity = capacity / blockSize; 277 if (blockNumCapacity >= Integer.MAX_VALUE) { 278 // Enough for about 32TB of cache! 279 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); 280 } 281 282 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); 283 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); 284 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); 285 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); 286 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); 287 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); 288 this.queueAdditionWaitTime = 289 conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); 290 this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY); 291 292 sanityCheckConfigs(); 293 294 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor 295 + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " 296 + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor 297 + ", useStrongRef: " + useStrongRef); 298 299 this.cacheCapacity = capacity; 300 this.persistencePath = persistencePath; 301 this.blockSize = blockSize; 302 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; 303 304 this.allocFailLogPrevTs = 0; 305 306 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 307 for (int i = 0; i < writerThreads.length; ++i) { 308 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); 309 } 310 311 assert writerQueues.size() == writerThreads.length; 312 this.ramCache = new RAMCache(); 313 314 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); 315 316 if (ioEngine.isPersistent() && persistencePath != null) { 317 try { 318 retrieveFromFile(bucketSizes); 319 } catch (IOException ioex) { 320 LOG.error("Can't restore from file[" + persistencePath + "] because of ", ioex); 321 } 322 } 323 final String threadName = Thread.currentThread().getName(); 324 this.cacheEnabled = true; 325 for (int i = 0; i < writerThreads.length; ++i) { 326 writerThreads[i] = new WriterThread(writerQueues.get(i)); 327 writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); 328 writerThreads[i].setDaemon(true); 329 } 330 startWriterThreads(); 331 332 // Run the statistics thread periodically to print the cache statistics log 333 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 334 // every five minutes. 335 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, 336 statThreadPeriod, TimeUnit.SECONDS); 337 LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity=" 338 + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize) 339 + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" 340 + persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); 341 } 342 343 private void sanityCheckConfigs() { 344 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, 345 ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 346 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, 347 MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 348 Preconditions.checkArgument(minFactor <= acceptableFactor, 349 MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); 350 Preconditions.checkArgument(extraFreeFactor >= 0, 351 EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); 352 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, 353 SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 354 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, 355 MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 356 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, 357 MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 358 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, 359 SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and " 360 + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); 361 } 362 363 /** 364 * Called by the constructor to start the writer threads. Used by tests that need to override 365 * starting the threads. 366 */ 367 protected void startWriterThreads() { 368 for (WriterThread thread : writerThreads) { 369 thread.start(); 370 } 371 } 372 373 boolean isCacheEnabled() { 374 return this.cacheEnabled; 375 } 376 377 @Override 378 public long getMaxSize() { 379 return this.cacheCapacity; 380 } 381 382 public String getIoEngine() { 383 return ioEngine.toString(); 384 } 385 386 /** 387 * Get the IOEngine from the IO engine name 388 * @return the IOEngine 389 */ 390 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) 391 throws IOException { 392 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { 393 // In order to make the usage simple, we only need the prefix 'files:' in 394 // document whether one or multiple file(s), but also support 'file:' for 395 // the compatibility 396 String[] filePaths = 397 ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER); 398 return new FileIOEngine(capacity, persistencePath != null, filePaths); 399 } else if (ioEngineName.startsWith("offheap")) { 400 return new ByteBufferIOEngine(capacity); 401 } else if (ioEngineName.startsWith("mmap:")) { 402 return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 403 } else if (ioEngineName.startsWith("pmem:")) { 404 // This mode of bucket cache creates an IOEngine over a file on the persistent memory 405 // device. Since the persistent memory device has its own address space the contents 406 // mapped to this address space does not get swapped out like in the case of mmapping 407 // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache 408 // can be directly referred to without having to copy them onheap. Once the RPC is done, 409 // the blocks can be returned back as in case of ByteBufferIOEngine. 410 return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 411 } else { 412 throw new IllegalArgumentException( 413 "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); 414 } 415 } 416 417 /** 418 * Cache the block with the specified name and buffer. 419 * @param cacheKey block's cache key 420 * @param buf block buffer 421 */ 422 @Override 423 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 424 cacheBlock(cacheKey, buf, false); 425 } 426 427 /** 428 * Cache the block with the specified name and buffer. 429 * @param cacheKey block's cache key 430 * @param cachedItem block buffer 431 * @param inMemory if block is in-memory 432 */ 433 @Override 434 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { 435 cacheBlockWithWait(cacheKey, cachedItem, inMemory, false); 436 } 437 438 /** 439 * Cache the block with the specified name and buffer. 440 * @param cacheKey block's cache key 441 * @param cachedItem block buffer 442 * @param inMemory if block is in-memory 443 */ 444 @Override 445 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 446 boolean waitWhenCache) { 447 cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0); 448 } 449 450 /** 451 * Cache the block to ramCache 452 * @param cacheKey block's cache key 453 * @param cachedItem block buffer 454 * @param inMemory if block is in-memory 455 * @param wait if true, blocking wait when queue is full 456 */ 457 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 458 boolean wait) { 459 if (cacheEnabled) { 460 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { 461 if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) { 462 BucketEntry bucketEntry = backingMap.get(cacheKey); 463 if (bucketEntry != null && bucketEntry.isRpcRef()) { 464 // avoid replace when there are RPC refs for the bucket entry in bucket cache 465 return; 466 } 467 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 468 } 469 } else { 470 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 471 } 472 } 473 } 474 475 protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { 476 return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock); 477 } 478 479 protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 480 boolean inMemory, boolean wait) { 481 if (!cacheEnabled) { 482 return; 483 } 484 if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) { 485 cacheKey.setBlockType(cachedItem.getBlockType()); 486 } 487 LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); 488 // Stuff the entry into the RAM cache so it can get drained to the persistent store 489 RAMQueueEntry re = 490 new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); 491 /** 492 * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same 493 * key in ramCache, the heap size of bucket cache need to update if replacing entry from 494 * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if 495 * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct 496 * one, then the heap size will mess up (HBASE-20789) 497 */ 498 if (ramCache.putIfAbsent(cacheKey, re) != null) { 499 return; 500 } 501 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); 502 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); 503 boolean successfulAddition = false; 504 if (wait) { 505 try { 506 successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS); 507 } catch (InterruptedException e) { 508 Thread.currentThread().interrupt(); 509 } 510 } else { 511 successfulAddition = bq.offer(re); 512 } 513 if (!successfulAddition) { 514 ramCache.remove(cacheKey); 515 cacheStats.failInsert(); 516 } else { 517 this.blockNumber.increment(); 518 this.heapSize.add(cachedItem.heapSize()); 519 } 520 } 521 522 /** 523 * Get the buffer of the block with the specified key. 524 * @param key block's cache key 525 * @param caching true if the caller caches blocks on cache misses 526 * @param repeat Whether this is a repeat lookup for the same block 527 * @param updateCacheMetrics Whether we should update cache metrics or not 528 * @return buffer of specified cache key, or null if not in cache 529 */ 530 @Override 531 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 532 boolean updateCacheMetrics) { 533 if (!cacheEnabled) { 534 return null; 535 } 536 RAMQueueEntry re = ramCache.get(key); 537 if (re != null) { 538 if (updateCacheMetrics) { 539 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 540 } 541 re.access(accessCount.incrementAndGet()); 542 return re.getData(); 543 } 544 BucketEntry bucketEntry = backingMap.get(key); 545 if (bucketEntry != null) { 546 long start = System.nanoTime(); 547 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 548 try { 549 lock.readLock().lock(); 550 // We can not read here even if backingMap does contain the given key because its offset 551 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check 552 // existence here. 553 if (bucketEntry.equals(backingMap.get(key))) { 554 // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the 555 // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to 556 // the same BucketEntry, then all of the three will share the same refCnt. 557 Cacheable cachedBlock = ioEngine.read(bucketEntry); 558 if (ioEngine.usesSharedMemory()) { 559 // If IOEngine use shared memory, cachedBlock and BucketEntry will share the 560 // same RefCnt, do retain here, in order to count the number of RPC references 561 cachedBlock.retain(); 562 } 563 // Update the cache statistics. 564 if (updateCacheMetrics) { 565 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 566 cacheStats.ioHit(System.nanoTime() - start); 567 } 568 bucketEntry.access(accessCount.incrementAndGet()); 569 if (this.ioErrorStartTime > 0) { 570 ioErrorStartTime = -1; 571 } 572 return cachedBlock; 573 } 574 } catch (IOException ioex) { 575 LOG.error("Failed reading block " + key + " from bucket cache", ioex); 576 checkIOErrorIsTolerated(); 577 } finally { 578 lock.readLock().unlock(); 579 } 580 } 581 if (!repeat && updateCacheMetrics) { 582 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 583 } 584 return null; 585 } 586 587 /** 588 * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap} 589 */ 590 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 591 boolean evictedByEvictionProcess) { 592 bucketEntry.markAsEvicted(); 593 blocksByHFile.remove(cacheKey); 594 if (decrementBlockNumber) { 595 this.blockNumber.decrement(); 596 } 597 if (evictedByEvictionProcess) { 598 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); 599 } 600 } 601 602 /** 603 * Free the {{@link BucketEntry} actually,which could only be invoked when the 604 * {@link BucketEntry#refCnt} becoming 0. 605 */ 606 void freeBucketEntry(BucketEntry bucketEntry) { 607 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 608 realCacheSize.add(-1 * bucketEntry.getLength()); 609 } 610 611 /** 612 * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br> 613 * 1. Close an HFile, and clear all cached blocks. <br> 614 * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br> 615 * <p> 616 * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap. 617 * Here we evict the block from backingMap immediately, but only free the reference from bucket 618 * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this 619 * block, block can only be de-allocated when all of them release the block. 620 * <p> 621 * NOTICE: we need to grab the write offset lock firstly before releasing the reference from 622 * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when 623 * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak. 624 * @param cacheKey Block to evict 625 * @return true to indicate whether we've evicted successfully or not. 626 */ 627 @Override 628 public boolean evictBlock(BlockCacheKey cacheKey) { 629 return doEvictBlock(cacheKey, null, false); 630 } 631 632 /** 633 * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and 634 * {@link BucketCache#ramCache}. <br/> 635 * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 636 * {@link BucketEntry} could be removed. 637 * @param cacheKey {@link BlockCacheKey} to evict. 638 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 639 * @return true to indicate whether we've evicted successfully or not. 640 */ 641 private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, 642 boolean evictedByEvictionProcess) { 643 if (!cacheEnabled) { 644 return false; 645 } 646 boolean existedInRamCache = removeFromRamCache(cacheKey); 647 if (bucketEntry == null) { 648 bucketEntry = backingMap.get(cacheKey); 649 } 650 final BucketEntry bucketEntryToUse = bucketEntry; 651 652 if (bucketEntryToUse == null) { 653 if (existedInRamCache && evictedByEvictionProcess) { 654 cacheStats.evicted(0, cacheKey.isPrimary()); 655 } 656 return existedInRamCache; 657 } else { 658 return bucketEntryToUse.withWriteLock(offsetLock, () -> { 659 if (backingMap.remove(cacheKey, bucketEntryToUse)) { 660 blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess); 661 return true; 662 } 663 return false; 664 }); 665 } 666 } 667 668 /** 669 * <pre> 670 * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as 671 * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}. 672 * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf} 673 * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different: 674 * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap}, 675 * it is the return value of current {@link BucketCache#createRecycler} method. 676 * 677 * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache}, 678 * it is {@link ByteBuffAllocator#putbackBuffer}. 679 * </pre> 680 */ 681 private Recycler createRecycler(final BucketEntry bucketEntry) { 682 return () -> { 683 freeBucketEntry(bucketEntry); 684 return; 685 }; 686 } 687 688 /** 689 * NOTE: This method is only for test. 690 */ 691 public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) { 692 BucketEntry bucketEntry = backingMap.get(blockCacheKey); 693 if (bucketEntry == null) { 694 return false; 695 } 696 return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry); 697 } 698 699 /** 700 * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if 701 * {@link BucketEntry#isRpcRef} is false. <br/> 702 * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 703 * {@link BucketEntry} could be removed. 704 * @param blockCacheKey {@link BlockCacheKey} to evict. 705 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 706 * @return true to indicate whether we've evicted successfully or not. 707 */ 708 boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) { 709 if (!bucketEntry.isRpcRef()) { 710 return doEvictBlock(blockCacheKey, bucketEntry, true); 711 } 712 return false; 713 } 714 715 protected boolean removeFromRamCache(BlockCacheKey cacheKey) { 716 return ramCache.remove(cacheKey, re -> { 717 if (re != null) { 718 this.blockNumber.decrement(); 719 this.heapSize.add(-1 * re.getData().heapSize()); 720 } 721 }); 722 } 723 724 /* 725 * Statistics thread. Periodically output cache statistics to the log. 726 */ 727 private static class StatisticsThread extends Thread { 728 private final BucketCache bucketCache; 729 730 public StatisticsThread(BucketCache bucketCache) { 731 super("BucketCacheStatsThread"); 732 setDaemon(true); 733 this.bucketCache = bucketCache; 734 } 735 736 @Override 737 public void run() { 738 bucketCache.logStats(); 739 } 740 } 741 742 public void logStats() { 743 long totalSize = bucketAllocator.getTotalSize(); 744 long usedSize = bucketAllocator.getUsedSize(); 745 long freeSize = totalSize - usedSize; 746 long cacheSize = getRealCacheSize(); 747 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize=" 748 + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", " 749 + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize=" 750 + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", " 751 + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond=" 752 + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit=" 753 + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio=" 754 + (cacheStats.getHitCount() == 0 755 ? "0," 756 : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", ")) 757 + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits=" 758 + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio=" 759 + (cacheStats.getHitCachingCount() == 0 760 ? "0," 761 : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", ")) 762 + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted=" 763 + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction() 764 + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount()); 765 cacheStats.reset(); 766 767 bucketAllocator.logDebugStatistics(); 768 } 769 770 public long getRealCacheSize() { 771 return this.realCacheSize.sum(); 772 } 773 774 public long acceptableSize() { 775 return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); 776 } 777 778 long getPartitionSize(float partitionFactor) { 779 return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); 780 } 781 782 /** 783 * Return the count of bucketSizeinfos still need free space 784 */ 785 private int bucketSizesAboveThresholdCount(float minFactor) { 786 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 787 int fullCount = 0; 788 for (int i = 0; i < stats.length; i++) { 789 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 790 freeGoal = Math.max(freeGoal, 1); 791 if (stats[i].freeCount() < freeGoal) { 792 fullCount++; 793 } 794 } 795 return fullCount; 796 } 797 798 /** 799 * This method will find the buckets that are minimally occupied and are not reference counted and 800 * will free them completely without any constraint on the access times of the elements, and as a 801 * process will completely free at most the number of buckets passed, sometimes it might not due 802 * to changing refCounts 803 * @param completelyFreeBucketsNeeded number of buckets to free 804 **/ 805 private void freeEntireBuckets(int completelyFreeBucketsNeeded) { 806 if (completelyFreeBucketsNeeded != 0) { 807 // First we will build a set where the offsets are reference counted, usually 808 // this set is small around O(Handler Count) unless something else is wrong 809 Set<Integer> inUseBuckets = new HashSet<>(); 810 backingMap.forEach((k, be) -> { 811 if (be.isRpcRef()) { 812 inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset())); 813 } 814 }); 815 Set<Integer> candidateBuckets = 816 bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded); 817 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) { 818 if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) { 819 evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue()); 820 } 821 } 822 } 823 } 824 825 /** 826 * Free the space if the used size reaches acceptableSize() or one size block couldn't be 827 * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some 828 * blocks evicted 829 * @param why Why we are being called 830 */ 831 void freeSpace(final String why) { 832 // Ensure only one freeSpace progress at a time 833 if (!freeSpaceLock.tryLock()) { 834 return; 835 } 836 try { 837 freeInProgress = true; 838 long bytesToFreeWithoutExtra = 0; 839 // Calculate free byte for each bucketSizeinfo 840 StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null; 841 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 842 long[] bytesToFreeForBucket = new long[stats.length]; 843 for (int i = 0; i < stats.length; i++) { 844 bytesToFreeForBucket[i] = 0; 845 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 846 freeGoal = Math.max(freeGoal, 1); 847 if (stats[i].freeCount() < freeGoal) { 848 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); 849 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; 850 if (msgBuffer != null) { 851 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" 852 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); 853 } 854 } 855 } 856 if (msgBuffer != null) { 857 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); 858 } 859 860 if (bytesToFreeWithoutExtra <= 0) { 861 return; 862 } 863 long currentSize = bucketAllocator.getUsedSize(); 864 long totalSize = bucketAllocator.getTotalSize(); 865 if (LOG.isDebugEnabled() && msgBuffer != null) { 866 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() 867 + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" 868 + StringUtils.byteDesc(realCacheSize.sum()) + ", total=" 869 + StringUtils.byteDesc(totalSize)); 870 } 871 872 long bytesToFreeWithExtra = 873 (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor)); 874 875 // Instantiate priority buckets 876 BucketEntryGroup bucketSingle = 877 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor)); 878 BucketEntryGroup bucketMulti = 879 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor)); 880 BucketEntryGroup bucketMemory = 881 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor)); 882 883 // Scan entire map putting bucket entry into appropriate bucket entry 884 // group 885 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { 886 switch (bucketEntryWithKey.getValue().getPriority()) { 887 case SINGLE: { 888 bucketSingle.add(bucketEntryWithKey); 889 break; 890 } 891 case MULTI: { 892 bucketMulti.add(bucketEntryWithKey); 893 break; 894 } 895 case MEMORY: { 896 bucketMemory.add(bucketEntryWithKey); 897 break; 898 } 899 } 900 } 901 902 PriorityQueue<BucketEntryGroup> bucketQueue = 903 new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow)); 904 905 bucketQueue.add(bucketSingle); 906 bucketQueue.add(bucketMulti); 907 bucketQueue.add(bucketMemory); 908 909 int remainingBuckets = bucketQueue.size(); 910 long bytesFreed = 0; 911 912 BucketEntryGroup bucketGroup; 913 while ((bucketGroup = bucketQueue.poll()) != null) { 914 long overflow = bucketGroup.overflow(); 915 if (overflow > 0) { 916 long bucketBytesToFree = 917 Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); 918 bytesFreed += bucketGroup.free(bucketBytesToFree); 919 } 920 remainingBuckets--; 921 } 922 923 // Check and free if there are buckets that still need freeing of space 924 if (bucketSizesAboveThresholdCount(minFactor) > 0) { 925 bucketQueue.clear(); 926 remainingBuckets = 3; 927 928 bucketQueue.add(bucketSingle); 929 bucketQueue.add(bucketMulti); 930 bucketQueue.add(bucketMemory); 931 932 while ((bucketGroup = bucketQueue.poll()) != null) { 933 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; 934 bytesFreed += bucketGroup.free(bucketBytesToFree); 935 remainingBuckets--; 936 } 937 } 938 939 // Even after the above free we might still need freeing because of the 940 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e 941 // there might be some buckets where the occupancy is very sparse and thus are not 942 // yielding the free for the other bucket sizes, the fix for this to evict some 943 // of the buckets, we do this by evicting the buckets that are least fulled 944 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f)); 945 946 if (LOG.isDebugEnabled()) { 947 long single = bucketSingle.totalSize(); 948 long multi = bucketMulti.totalSize(); 949 long memory = bucketMemory.totalSize(); 950 if (LOG.isDebugEnabled()) { 951 LOG.debug("Bucket cache free space completed; " + "freed=" 952 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize) 953 + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" 954 + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); 955 } 956 } 957 958 } catch (Throwable t) { 959 LOG.warn("Failed freeing space", t); 960 } finally { 961 cacheStats.evict(); 962 freeInProgress = false; 963 freeSpaceLock.unlock(); 964 } 965 } 966 967 // This handles flushing the RAM cache to IOEngine. 968 class WriterThread extends Thread { 969 private final BlockingQueue<RAMQueueEntry> inputQueue; 970 private volatile boolean writerEnabled = true; 971 private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE); 972 973 WriterThread(BlockingQueue<RAMQueueEntry> queue) { 974 super("BucketCacheWriterThread"); 975 this.inputQueue = queue; 976 } 977 978 // Used for test 979 void disableWriter() { 980 this.writerEnabled = false; 981 } 982 983 @Override 984 public void run() { 985 List<RAMQueueEntry> entries = new ArrayList<>(); 986 try { 987 while (cacheEnabled && writerEnabled) { 988 try { 989 try { 990 // Blocks 991 entries = getRAMQueueEntries(inputQueue, entries); 992 } catch (InterruptedException ie) { 993 if (!cacheEnabled || !writerEnabled) { 994 break; 995 } 996 } 997 doDrain(entries, metaBuff); 998 } catch (Exception ioe) { 999 LOG.error("WriterThread encountered error", ioe); 1000 } 1001 } 1002 } catch (Throwable t) { 1003 LOG.warn("Failed doing drain", t); 1004 } 1005 LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); 1006 } 1007 } 1008 1009 /** 1010 * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing 1011 * cache with a new block for the same cache key. there's a corner case: one thread cache a block 1012 * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block 1013 * with the same cache key do the same thing for the same cache key, so if not evict the previous 1014 * bucket entry, then memory leak happen because the previous bucketEntry is gone but the 1015 * bucketAllocator do not free its memory. 1016 * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey 1017 * cacheKey, Cacheable newBlock) 1018 * @param key Block cache key 1019 * @param bucketEntry Bucket entry to put into backingMap. 1020 */ 1021 protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 1022 BucketEntry previousEntry = backingMap.put(key, bucketEntry); 1023 if (previousEntry != null && previousEntry != bucketEntry) { 1024 previousEntry.withWriteLock(offsetLock, () -> { 1025 blockEvicted(key, previousEntry, false, false); 1026 return null; 1027 }); 1028 } 1029 } 1030 1031 /** 1032 * Prepare and return a warning message for Bucket Allocator Exception 1033 * @param fle The exception 1034 * @param re The RAMQueueEntry for which the exception was thrown. 1035 * @return A warning message created from the input RAMQueueEntry object. 1036 */ 1037 private static String getAllocationFailWarningMessage(final BucketAllocatorException fle, 1038 final RAMQueueEntry re) { 1039 final StringBuilder sb = new StringBuilder(); 1040 sb.append("Most recent failed allocation after "); 1041 sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD); 1042 sb.append(" ms;"); 1043 if (re != null) { 1044 if (re.getData() instanceof HFileBlock) { 1045 final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext(); 1046 final String columnFamily = Bytes.toString(fileContext.getColumnFamily()); 1047 final String tableName = Bytes.toString(fileContext.getTableName()); 1048 if (tableName != null) { 1049 sb.append(" Table: "); 1050 sb.append(tableName); 1051 } 1052 if (columnFamily != null) { 1053 sb.append(" CF: "); 1054 sb.append(columnFamily); 1055 } 1056 sb.append(" HFile: "); 1057 if (fileContext.getHFileName() != null) { 1058 sb.append(fileContext.getHFileName()); 1059 } else { 1060 sb.append(re.getKey()); 1061 } 1062 } else { 1063 sb.append(" HFile: "); 1064 sb.append(re.getKey()); 1065 } 1066 } 1067 sb.append(" Message: "); 1068 sb.append(fle.getMessage()); 1069 return sb.toString(); 1070 } 1071 1072 /** 1073 * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that 1074 * are passed in even if failure being sure to remove from ramCache else we'll never undo the 1075 * references and we'll OOME. 1076 * @param entries Presumes list passed in here will be processed by this invocation only. No 1077 * interference expected. 1078 */ 1079 void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException { 1080 if (entries.isEmpty()) { 1081 return; 1082 } 1083 // This method is a little hard to follow. We run through the passed in entries and for each 1084 // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must 1085 // do cleanup making sure we've cleared ramCache of all entries regardless of whether we 1086 // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by 1087 // filling ramCache. We do the clean up by again running through the passed in entries 1088 // doing extra work when we find a non-null bucketEntries corresponding entry. 1089 final int size = entries.size(); 1090 BucketEntry[] bucketEntries = new BucketEntry[size]; 1091 // Index updated inside loop if success or if we can't succeed. We retry if cache is full 1092 // when we go to add an entry by going around the loop again without upping the index. 1093 int index = 0; 1094 while (cacheEnabled && index < size) { 1095 RAMQueueEntry re = null; 1096 try { 1097 re = entries.get(index); 1098 if (re == null) { 1099 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); 1100 index++; 1101 continue; 1102 } 1103 BlockCacheKey cacheKey = re.getKey(); 1104 if (ramCache.containsKey(cacheKey)) { 1105 blocksByHFile.add(cacheKey); 1106 } 1107 // Reset the position for reuse. 1108 // It should be guaranteed that the data in the metaBuff has been transferred to the 1109 // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already 1110 // transferred with our current IOEngines. Should take care, when we have new kinds of 1111 // IOEngine in the future. 1112 metaBuff.clear(); 1113 BucketEntry bucketEntry = 1114 re.writeToCache(ioEngine, bucketAllocator, realCacheSize, this::createRecycler, metaBuff); 1115 // Successfully added. Up index and add bucketEntry. Clear io exceptions. 1116 bucketEntries[index] = bucketEntry; 1117 if (ioErrorStartTime > 0) { 1118 ioErrorStartTime = -1; 1119 } 1120 index++; 1121 } catch (BucketAllocatorException fle) { 1122 long currTs = EnvironmentEdgeManager.currentTime(); 1123 cacheStats.allocationFailed(); // Record the warning. 1124 if ( 1125 allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD 1126 ) { 1127 LOG.warn(getAllocationFailWarningMessage(fle, re)); 1128 allocFailLogPrevTs = currTs; 1129 } 1130 // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. 1131 bucketEntries[index] = null; 1132 index++; 1133 } catch (CacheFullException cfe) { 1134 // Cache full when we tried to add. Try freeing space and then retrying (don't up index) 1135 if (!freeInProgress) { 1136 freeSpace("Full!"); 1137 } else { 1138 Thread.sleep(50); 1139 } 1140 } catch (IOException ioex) { 1141 // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. 1142 LOG.error("Failed writing to bucket cache", ioex); 1143 checkIOErrorIsTolerated(); 1144 } 1145 } 1146 1147 // Make sure data pages are written on media before we update maps. 1148 try { 1149 ioEngine.sync(); 1150 } catch (IOException ioex) { 1151 LOG.error("Failed syncing IO engine", ioex); 1152 checkIOErrorIsTolerated(); 1153 // Since we failed sync, free the blocks in bucket allocator 1154 for (int i = 0; i < entries.size(); ++i) { 1155 BucketEntry bucketEntry = bucketEntries[i]; 1156 if (bucketEntry != null) { 1157 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 1158 bucketEntries[i] = null; 1159 } 1160 } 1161 } 1162 1163 // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if 1164 // success or error. 1165 for (int i = 0; i < size; ++i) { 1166 BlockCacheKey key = entries.get(i).getKey(); 1167 // Only add if non-null entry. 1168 if (bucketEntries[i] != null) { 1169 putIntoBackingMap(key, bucketEntries[i]); 1170 } 1171 // Always remove from ramCache even if we failed adding it to the block cache above. 1172 boolean existed = ramCache.remove(key, re -> { 1173 if (re != null) { 1174 heapSize.add(-1 * re.getData().heapSize()); 1175 } 1176 }); 1177 if (!existed && bucketEntries[i] != null) { 1178 // Block should have already been evicted. Remove it and free space. 1179 final BucketEntry bucketEntry = bucketEntries[i]; 1180 bucketEntry.withWriteLock(offsetLock, () -> { 1181 if (backingMap.remove(key, bucketEntry)) { 1182 blockEvicted(key, bucketEntry, false, false); 1183 } 1184 return null; 1185 }); 1186 } 1187 } 1188 1189 long used = bucketAllocator.getUsedSize(); 1190 if (used > acceptableSize()) { 1191 freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); 1192 } 1193 return; 1194 } 1195 1196 /** 1197 * Blocks until elements available in {@code q} then tries to grab as many as possible before 1198 * returning. 1199 * @param receptacle Where to stash the elements taken from queue. We clear before we use it just 1200 * in case. 1201 * @param q The queue to take from. 1202 * @return {@code receptacle} laden with elements taken from the queue or empty if none found. 1203 */ 1204 static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q, 1205 List<RAMQueueEntry> receptacle) throws InterruptedException { 1206 // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it 1207 // ok even if list grew to accommodate thousands. 1208 receptacle.clear(); 1209 receptacle.add(q.take()); 1210 q.drainTo(receptacle); 1211 return receptacle; 1212 } 1213 1214 /** 1215 * @see #retrieveFromFile(int[]) 1216 */ 1217 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", 1218 justification = "false positive, try-with-resources ensures close is called.") 1219 private void persistToFile() throws IOException { 1220 assert !cacheEnabled; 1221 if (!ioEngine.isPersistent()) { 1222 throw new IOException("Attempt to persist non-persistent cache mappings!"); 1223 } 1224 try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) { 1225 fos.write(ProtobufMagic.PB_MAGIC); 1226 BucketProtoUtils.toPB(this).writeDelimitedTo(fos); 1227 } 1228 if (prefetchedFileListPath != null) { 1229 PrefetchExecutor.persistToFile(prefetchedFileListPath); 1230 } 1231 } 1232 1233 /** 1234 * @see #persistToFile() 1235 */ 1236 private void retrieveFromFile(int[] bucketSizes) throws IOException { 1237 File persistenceFile = new File(persistencePath); 1238 if (!persistenceFile.exists()) { 1239 return; 1240 } 1241 assert !cacheEnabled; 1242 if (prefetchedFileListPath != null) { 1243 PrefetchExecutor.retrieveFromFile(prefetchedFileListPath); 1244 } 1245 1246 try (FileInputStream in = deleteFileOnClose(persistenceFile)) { 1247 int pblen = ProtobufMagic.lengthOfPBMagic(); 1248 byte[] pbuf = new byte[pblen]; 1249 int read = in.read(pbuf); 1250 if (read != pblen) { 1251 throw new IOException("Incorrect number of bytes read while checking for protobuf magic " 1252 + "number. Requested=" + pblen + ", Received= " + read + ", File=" + persistencePath); 1253 } 1254 if (!ProtobufMagic.isPBMagicPrefix(pbuf)) { 1255 // In 3.0 we have enough flexibility to dump the old cache data. 1256 // TODO: In 2.x line, this might need to be filled in to support reading the old format 1257 throw new IOException( 1258 "Persistence file does not start with protobuf magic number. " + persistencePath); 1259 } 1260 parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); 1261 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1262 blockNumber.add(backingMap.size()); 1263 } 1264 } 1265 1266 /** 1267 * Create an input stream that deletes the file after reading it. Use in try-with-resources to 1268 * avoid this pattern where an exception thrown from a finally block may mask earlier exceptions: 1269 * 1270 * <pre> 1271 * File f = ... 1272 * try (FileInputStream fis = new FileInputStream(f)) { 1273 * // use the input stream 1274 * } finally { 1275 * if (!f.delete()) throw new IOException("failed to delete"); 1276 * } 1277 * </pre> 1278 * 1279 * @param file the file to read and delete 1280 * @return a FileInputStream for the given file 1281 * @throws IOException if there is a problem creating the stream 1282 */ 1283 private FileInputStream deleteFileOnClose(final File file) throws IOException { 1284 return new FileInputStream(file) { 1285 private File myFile; 1286 1287 private FileInputStream init(File file) { 1288 myFile = file; 1289 return this; 1290 } 1291 1292 @Override 1293 public void close() throws IOException { 1294 // close() will be called during try-with-resources and it will be 1295 // called by finalizer thread during GC. To avoid double-free resource, 1296 // set myFile to null after the first call. 1297 if (myFile == null) { 1298 return; 1299 } 1300 1301 super.close(); 1302 if (!myFile.delete()) { 1303 throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath()); 1304 } 1305 myFile = null; 1306 } 1307 }.init(file); 1308 } 1309 1310 private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) 1311 throws IOException { 1312 if (capacitySize != cacheCapacity) { 1313 throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize) 1314 + ", expected: " + StringUtils.byteDesc(cacheCapacity)); 1315 } 1316 if (!ioEngine.getClass().getName().equals(ioclass)) { 1317 throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:" 1318 + ioEngine.getClass().getName()); 1319 } 1320 if (!backingMap.getClass().getName().equals(mapclass)) { 1321 throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:" 1322 + backingMap.getClass().getName()); 1323 } 1324 } 1325 1326 private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { 1327 if (proto.hasChecksum()) { 1328 ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), 1329 algorithm); 1330 } else { 1331 // if has not checksum, it means the persistence file is old format 1332 LOG.info("Persistent file is old format, it does not support verifying file integrity!"); 1333 } 1334 verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); 1335 backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), 1336 this::createRecycler); 1337 } 1338 1339 /** 1340 * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors 1341 * exceeds ioErrorsDurationTimeTolerated, we will disable the cache 1342 */ 1343 private void checkIOErrorIsTolerated() { 1344 long now = EnvironmentEdgeManager.currentTime(); 1345 // Do a single read to a local variable to avoid timing issue - HBASE-24454 1346 long ioErrorStartTimeTmp = this.ioErrorStartTime; 1347 if (ioErrorStartTimeTmp > 0) { 1348 if (cacheEnabled && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { 1349 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration 1350 + "ms, disabling cache, please check your IOEngine"); 1351 disableCache(); 1352 } 1353 } else { 1354 this.ioErrorStartTime = now; 1355 } 1356 } 1357 1358 /** 1359 * Used to shut down the cache -or- turn it off in the case of something broken. 1360 */ 1361 private void disableCache() { 1362 if (!cacheEnabled) return; 1363 cacheEnabled = false; 1364 ioEngine.shutdown(); 1365 this.scheduleThreadPool.shutdown(); 1366 for (int i = 0; i < writerThreads.length; ++i) 1367 writerThreads[i].interrupt(); 1368 this.ramCache.clear(); 1369 if (!ioEngine.isPersistent() || persistencePath == null) { 1370 // If persistent ioengine and a path, we will serialize out the backingMap. 1371 this.backingMap.clear(); 1372 } 1373 } 1374 1375 private void join() throws InterruptedException { 1376 for (int i = 0; i < writerThreads.length; ++i) 1377 writerThreads[i].join(); 1378 } 1379 1380 @Override 1381 public void shutdown() { 1382 disableCache(); 1383 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write=" 1384 + persistencePath); 1385 if (ioEngine.isPersistent() && persistencePath != null) { 1386 try { 1387 join(); 1388 persistToFile(); 1389 } catch (IOException ex) { 1390 LOG.error("Unable to persist data on exit: " + ex.toString(), ex); 1391 } catch (InterruptedException e) { 1392 LOG.warn("Failed to persist data on exit", e); 1393 } 1394 } 1395 } 1396 1397 @Override 1398 public CacheStats getStats() { 1399 return cacheStats; 1400 } 1401 1402 public BucketAllocator getAllocator() { 1403 return this.bucketAllocator; 1404 } 1405 1406 @Override 1407 public long heapSize() { 1408 return this.heapSize.sum(); 1409 } 1410 1411 @Override 1412 public long size() { 1413 return this.realCacheSize.sum(); 1414 } 1415 1416 @Override 1417 public long getCurrentDataSize() { 1418 return size(); 1419 } 1420 1421 @Override 1422 public long getFreeSize() { 1423 return this.bucketAllocator.getFreeSize(); 1424 } 1425 1426 @Override 1427 public long getBlockCount() { 1428 return this.blockNumber.sum(); 1429 } 1430 1431 @Override 1432 public long getDataBlockCount() { 1433 return getBlockCount(); 1434 } 1435 1436 @Override 1437 public long getCurrentSize() { 1438 return this.bucketAllocator.getUsedSize(); 1439 } 1440 1441 protected String getAlgorithm() { 1442 return algorithm; 1443 } 1444 1445 /** 1446 * Evicts all blocks for a specific HFile. 1447 * <p> 1448 * This is used for evict-on-close to remove all blocks of a specific HFile. 1449 * @return the number of blocks evicted 1450 */ 1451 @Override 1452 public int evictBlocksByHfileName(String hfileName) { 1453 PrefetchExecutor.removePrefetchedFileWhileEvict(hfileName); 1454 Set<BlockCacheKey> keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), 1455 true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true); 1456 1457 int numEvicted = 0; 1458 for (BlockCacheKey key : keySet) { 1459 if (evictBlock(key)) { 1460 ++numEvicted; 1461 } 1462 } 1463 1464 return numEvicted; 1465 } 1466 1467 /** 1468 * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each 1469 * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate 1470 * number of elements out of each according to configuration parameters and their relative sizes. 1471 */ 1472 private class BucketEntryGroup { 1473 1474 private CachedEntryQueue queue; 1475 private long totalSize = 0; 1476 private long bucketSize; 1477 1478 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { 1479 this.bucketSize = bucketSize; 1480 queue = new CachedEntryQueue(bytesToFree, blockSize); 1481 totalSize = 0; 1482 } 1483 1484 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) { 1485 totalSize += block.getValue().getLength(); 1486 queue.add(block); 1487 } 1488 1489 public long free(long toFree) { 1490 Map.Entry<BlockCacheKey, BucketEntry> entry; 1491 long freedBytes = 0; 1492 // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free 1493 // What to do then? Caching attempt fail? Need some changes in cacheBlock API? 1494 while ((entry = queue.pollLast()) != null) { 1495 BlockCacheKey blockCacheKey = entry.getKey(); 1496 BucketEntry be = entry.getValue(); 1497 if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) { 1498 freedBytes += be.getLength(); 1499 } 1500 if (freedBytes >= toFree) { 1501 return freedBytes; 1502 } 1503 } 1504 return freedBytes; 1505 } 1506 1507 public long overflow() { 1508 return totalSize - bucketSize; 1509 } 1510 1511 public long totalSize() { 1512 return totalSize; 1513 } 1514 } 1515 1516 /** 1517 * Block Entry stored in the memory with key,data and so on 1518 */ 1519 static class RAMQueueEntry { 1520 private final BlockCacheKey key; 1521 private final Cacheable data; 1522 private long accessCounter; 1523 private boolean inMemory; 1524 1525 RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory) { 1526 this.key = bck; 1527 this.data = data; 1528 this.accessCounter = accessCounter; 1529 this.inMemory = inMemory; 1530 } 1531 1532 public Cacheable getData() { 1533 return data; 1534 } 1535 1536 public BlockCacheKey getKey() { 1537 return key; 1538 } 1539 1540 public void access(long accessCounter) { 1541 this.accessCounter = accessCounter; 1542 } 1543 1544 private ByteBuffAllocator getByteBuffAllocator() { 1545 if (data instanceof HFileBlock) { 1546 return ((HFileBlock) data).getByteBuffAllocator(); 1547 } 1548 return ByteBuffAllocator.HEAP; 1549 } 1550 1551 public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, 1552 final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler, 1553 ByteBuffer metaBuff) throws IOException { 1554 int len = data.getSerializedLength(); 1555 // This cacheable thing can't be serialized 1556 if (len == 0) { 1557 return null; 1558 } 1559 long offset = alloc.allocateBlock(len); 1560 boolean succ = false; 1561 BucketEntry bucketEntry = null; 1562 try { 1563 bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, createRecycler, 1564 getByteBuffAllocator()); 1565 bucketEntry.setDeserializerReference(data.getDeserializer()); 1566 if (data instanceof HFileBlock) { 1567 // If an instance of HFileBlock, save on some allocations. 1568 HFileBlock block = (HFileBlock) data; 1569 ByteBuff sliceBuf = block.getBufferReadOnly(); 1570 block.getMetaData(metaBuff); 1571 ioEngine.write(sliceBuf, offset); 1572 ioEngine.write(metaBuff, offset + len - metaBuff.limit()); 1573 } else { 1574 // Only used for testing. 1575 ByteBuffer bb = ByteBuffer.allocate(len); 1576 data.serialize(bb, true); 1577 ioEngine.write(bb, offset); 1578 } 1579 succ = true; 1580 } finally { 1581 if (!succ) { 1582 alloc.freeBlock(offset, len); 1583 } 1584 } 1585 realCacheSize.add(len); 1586 return bucketEntry; 1587 } 1588 } 1589 1590 /** 1591 * Only used in test 1592 */ 1593 void stopWriterThreads() throws InterruptedException { 1594 for (WriterThread writerThread : writerThreads) { 1595 writerThread.disableWriter(); 1596 writerThread.interrupt(); 1597 writerThread.join(); 1598 } 1599 } 1600 1601 @Override 1602 public Iterator<CachedBlock> iterator() { 1603 // Don't bother with ramcache since stuff is in here only a little while. 1604 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator(); 1605 return new Iterator<CachedBlock>() { 1606 private final long now = System.nanoTime(); 1607 1608 @Override 1609 public boolean hasNext() { 1610 return i.hasNext(); 1611 } 1612 1613 @Override 1614 public CachedBlock next() { 1615 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next(); 1616 return new CachedBlock() { 1617 @Override 1618 public String toString() { 1619 return BlockCacheUtil.toString(this, now); 1620 } 1621 1622 @Override 1623 public BlockPriority getBlockPriority() { 1624 return e.getValue().getPriority(); 1625 } 1626 1627 @Override 1628 public BlockType getBlockType() { 1629 // Not held by BucketEntry. Could add it if wanted on BucketEntry creation. 1630 return null; 1631 } 1632 1633 @Override 1634 public long getOffset() { 1635 return e.getKey().getOffset(); 1636 } 1637 1638 @Override 1639 public long getSize() { 1640 return e.getValue().getLength(); 1641 } 1642 1643 @Override 1644 public long getCachedTime() { 1645 return e.getValue().getCachedTime(); 1646 } 1647 1648 @Override 1649 public String getFilename() { 1650 return e.getKey().getHfileName(); 1651 } 1652 1653 @Override 1654 public int compareTo(CachedBlock other) { 1655 int diff = this.getFilename().compareTo(other.getFilename()); 1656 if (diff != 0) return diff; 1657 1658 diff = Long.compare(this.getOffset(), other.getOffset()); 1659 if (diff != 0) return diff; 1660 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1661 throw new IllegalStateException( 1662 "" + this.getCachedTime() + ", " + other.getCachedTime()); 1663 } 1664 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1665 } 1666 1667 @Override 1668 public int hashCode() { 1669 return e.getKey().hashCode(); 1670 } 1671 1672 @Override 1673 public boolean equals(Object obj) { 1674 if (obj instanceof CachedBlock) { 1675 CachedBlock cb = (CachedBlock) obj; 1676 return compareTo(cb) == 0; 1677 } else { 1678 return false; 1679 } 1680 } 1681 }; 1682 } 1683 1684 @Override 1685 public void remove() { 1686 throw new UnsupportedOperationException(); 1687 } 1688 }; 1689 } 1690 1691 @Override 1692 public BlockCache[] getBlockCaches() { 1693 return null; 1694 } 1695 1696 public int getRpcRefCount(BlockCacheKey cacheKey) { 1697 BucketEntry bucketEntry = backingMap.get(cacheKey); 1698 if (bucketEntry != null) { 1699 return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1); 1700 } 1701 return 0; 1702 } 1703 1704 float getAcceptableFactor() { 1705 return acceptableFactor; 1706 } 1707 1708 float getMinFactor() { 1709 return minFactor; 1710 } 1711 1712 float getExtraFreeFactor() { 1713 return extraFreeFactor; 1714 } 1715 1716 float getSingleFactor() { 1717 return singleFactor; 1718 } 1719 1720 float getMultiFactor() { 1721 return multiFactor; 1722 } 1723 1724 float getMemoryFactor() { 1725 return memoryFactor; 1726 } 1727 1728 /** 1729 * Wrapped the delegate ConcurrentMap with maintaining its block's reference count. 1730 */ 1731 static class RAMCache { 1732 /** 1733 * Defined the map as {@link ConcurrentHashMap} explicitly here, because in 1734 * {@link RAMCache#get(BlockCacheKey)} and 1735 * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee 1736 * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the 1737 * func method can execute exactly once only when the key is present(or absent) and under the 1738 * lock context. Otherwise, the reference count of block will be messed up. Notice that the 1739 * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 1740 */ 1741 final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>(); 1742 1743 public boolean containsKey(BlockCacheKey key) { 1744 return delegate.containsKey(key); 1745 } 1746 1747 public RAMQueueEntry get(BlockCacheKey key) { 1748 return delegate.computeIfPresent(key, (k, re) -> { 1749 // It'll be referenced by RPC, so retain atomically here. if the get and retain is not 1750 // atomic, another thread may remove and release the block, when retaining in this thread we 1751 // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422) 1752 re.getData().retain(); 1753 return re; 1754 }); 1755 } 1756 1757 /** 1758 * Return the previous associated value, or null if absent. It has the same meaning as 1759 * {@link ConcurrentMap#putIfAbsent(Object, Object)} 1760 */ 1761 public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) { 1762 AtomicBoolean absent = new AtomicBoolean(false); 1763 RAMQueueEntry re = delegate.computeIfAbsent(key, k -> { 1764 // The RAMCache reference to this entry, so reference count should be increment. 1765 entry.getData().retain(); 1766 absent.set(true); 1767 return entry; 1768 }); 1769 return absent.get() ? null : re; 1770 } 1771 1772 public boolean remove(BlockCacheKey key) { 1773 return remove(key, re -> { 1774 }); 1775 } 1776 1777 /** 1778 * Defined an {@link Consumer} here, because once the removed entry release its reference count, 1779 * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an 1780 * exception. the consumer will access entry to remove before release its reference count. 1781 * Notice, don't change its reference count in the {@link Consumer} 1782 */ 1783 public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) { 1784 RAMQueueEntry previous = delegate.remove(key); 1785 action.accept(previous); 1786 if (previous != null) { 1787 previous.getData().release(); 1788 } 1789 return previous != null; 1790 } 1791 1792 public boolean isEmpty() { 1793 return delegate.isEmpty(); 1794 } 1795 1796 public void clear() { 1797 Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator(); 1798 while (it.hasNext()) { 1799 RAMQueueEntry re = it.next().getValue(); 1800 it.remove(); 1801 re.getData().release(); 1802 } 1803 } 1804 } 1805}