001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY; 022 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.FileOutputStream; 026import java.io.IOException; 027import java.nio.ByteBuffer; 028import java.util.ArrayList; 029import java.util.Comparator; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.NavigableSet; 035import java.util.PriorityQueue; 036import java.util.Set; 037import java.util.concurrent.ArrayBlockingQueue; 038import java.util.concurrent.BlockingQueue; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ConcurrentMap; 041import java.util.concurrent.ConcurrentSkipListSet; 042import java.util.concurrent.Executors; 043import java.util.concurrent.ScheduledExecutorService; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.atomic.AtomicBoolean; 046import java.util.concurrent.atomic.AtomicLong; 047import java.util.concurrent.atomic.LongAdder; 048import java.util.concurrent.locks.Lock; 049import java.util.concurrent.locks.ReentrantLock; 050import java.util.concurrent.locks.ReentrantReadWriteLock; 051import java.util.function.Consumer; 052import java.util.function.Function; 053import org.apache.hadoop.conf.Configuration; 054import org.apache.hadoop.hbase.HBaseConfiguration; 055import org.apache.hadoop.hbase.TableName; 056import org.apache.hadoop.hbase.client.Admin; 057import org.apache.hadoop.hbase.io.ByteBuffAllocator; 058import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 059import org.apache.hadoop.hbase.io.HeapSize; 060import org.apache.hadoop.hbase.io.hfile.BlockCache; 061import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 062import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 063import org.apache.hadoop.hbase.io.hfile.BlockPriority; 064import org.apache.hadoop.hbase.io.hfile.BlockType; 065import org.apache.hadoop.hbase.io.hfile.CacheStats; 066import org.apache.hadoop.hbase.io.hfile.Cacheable; 067import org.apache.hadoop.hbase.io.hfile.CachedBlock; 068import org.apache.hadoop.hbase.io.hfile.HFileBlock; 069import org.apache.hadoop.hbase.io.hfile.HFileContext; 070import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; 071import org.apache.hadoop.hbase.nio.ByteBuff; 072import org.apache.hadoop.hbase.nio.RefCnt; 073import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 074import org.apache.hadoop.hbase.util.Bytes; 075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 076import org.apache.hadoop.hbase.util.IdReadWriteLock; 077import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef; 078import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool; 079import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType; 080import org.apache.hadoop.util.StringUtils; 081import org.apache.yetus.audience.InterfaceAudience; 082import org.slf4j.Logger; 083import org.slf4j.LoggerFactory; 084 085import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 086import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 087 088import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; 089 090/** 091 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache 092 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket 093 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap 094 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files 095 * {@link FileIOEngine} to store/read the block data. 096 * <p> 097 * Eviction is via a similar algorithm as used in 098 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} 099 * <p> 100 * BucketCache can be used as mainly a block cache (see 101 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to 102 * decrease CMS GC and heap fragmentation. 103 * <p> 104 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to 105 * enlarge cache space via a victim cache. 106 */ 107@InterfaceAudience.Private 108public class BucketCache implements BlockCache, HeapSize { 109 private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class); 110 111 /** Priority buckets config */ 112 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; 113 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; 114 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; 115 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; 116 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; 117 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; 118 119 /** Use strong reference for offsetLock or not */ 120 private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref"; 121 private static final boolean STRONG_REF_DEFAULT = false; 122 123 /** Priority buckets */ 124 static final float DEFAULT_SINGLE_FACTOR = 0.25f; 125 static final float DEFAULT_MULTI_FACTOR = 0.50f; 126 static final float DEFAULT_MEMORY_FACTOR = 0.25f; 127 static final float DEFAULT_MIN_FACTOR = 0.85f; 128 129 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; 130 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 131 132 // Number of blocks to clear for each of the bucket size that is full 133 private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; 134 135 /** Statistics thread */ 136 private static final int statThreadPeriod = 5 * 60; 137 138 final static int DEFAULT_WRITER_THREADS = 3; 139 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; 140 141 // Store/read block data 142 transient final IOEngine ioEngine; 143 144 // Store the block in this map before writing it to cache 145 transient final RAMCache ramCache; 146 // In this map, store the block's meta data like offset, length 147 transient ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap; 148 149 /** 150 * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so 151 * that Bucket IO exceptions/errors don't bring down the HBase server. 152 */ 153 private volatile boolean cacheEnabled; 154 155 /** 156 * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other 157 * words, the work adding blocks to the BucketCache is divided up amongst the running 158 * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when 159 * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It 160 * then updates the ramCache and backingMap accordingly. 161 */ 162 transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); 163 transient final WriterThread[] writerThreads; 164 165 /** Volatile boolean to track if free space is in process or not */ 166 private volatile boolean freeInProgress = false; 167 private transient final Lock freeSpaceLock = new ReentrantLock(); 168 169 private final LongAdder realCacheSize = new LongAdder(); 170 private final LongAdder heapSize = new LongAdder(); 171 /** Current number of cached elements */ 172 private final LongAdder blockNumber = new LongAdder(); 173 174 /** Cache access count (sequential ID) */ 175 private final AtomicLong accessCount = new AtomicLong(); 176 177 private static final int DEFAULT_CACHE_WAIT_TIME = 50; 178 179 private final BucketCacheStats cacheStats = new BucketCacheStats(); 180 181 private final String persistencePath; 182 static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false); 183 private final long cacheCapacity; 184 /** Approximate block size */ 185 private final long blockSize; 186 187 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ 188 private final int ioErrorsTolerationDuration; 189 // 1 min 190 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; 191 192 // Start time of first IO error when reading or writing IO Engine, it will be 193 // reset after a successful read/write. 194 private volatile long ioErrorStartTime = -1; 195 196 /** 197 * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of 198 * this is to avoid freeing the block which is being read. 199 * <p> 200 */ 201 transient final IdReadWriteLock<Long> offsetLock; 202 203 private final NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>((a, b) -> { 204 int nameComparison = a.getHfileName().compareTo(b.getHfileName()); 205 if (nameComparison != 0) { 206 return nameComparison; 207 } 208 return Long.compare(a.getOffset(), b.getOffset()); 209 }); 210 211 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 212 private transient final ScheduledExecutorService scheduleThreadPool = 213 Executors.newScheduledThreadPool(1, 214 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); 215 216 // Allocate or free space for the block 217 private transient BucketAllocator bucketAllocator; 218 219 /** Acceptable size of cache (no evictions if size < acceptable) */ 220 private float acceptableFactor; 221 222 /** Minimum threshold of cache (when evicting, evict until size < min) */ 223 private float minFactor; 224 225 /** 226 * Free this floating point factor of extra blocks when evicting. For example free the number of 227 * blocks requested * (1 + extraFreeFactor) 228 */ 229 private float extraFreeFactor; 230 231 /** Single access bucket size */ 232 private float singleFactor; 233 234 /** Multiple access bucket size */ 235 private float multiFactor; 236 237 /** In-memory bucket size */ 238 private float memoryFactor; 239 240 private String prefetchedFileListPath; 241 242 private long bucketcachePersistInterval; 243 244 private static final String FILE_VERIFY_ALGORITHM = 245 "hbase.bucketcache.persistent.file.integrity.check.algorithm"; 246 private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; 247 248 private static final String QUEUE_ADDITION_WAIT_TIME = 249 "hbase.bucketcache.queue.addition.waittime"; 250 private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0; 251 private long queueAdditionWaitTime; 252 /** 253 * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file 254 * integrity, default algorithm is MD5 255 */ 256 private String algorithm; 257 258 /* Tracing failed Bucket Cache allocations. */ 259 private long allocFailLogPrevTs; // time of previous log event for allocation failure. 260 private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute. 261 262 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 263 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 264 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 265 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); 266 } 267 268 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 269 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 270 Configuration conf) throws IOException { 271 boolean useStrongRef = conf.getBoolean(STRONG_REF_KEY, STRONG_REF_DEFAULT); 272 if (useStrongRef) { 273 this.offsetLock = new IdReadWriteLockStrongRef<>(); 274 } else { 275 this.offsetLock = new IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT); 276 } 277 this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); 278 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); 279 this.writerThreads = new WriterThread[writerThreadNum]; 280 long blockNumCapacity = capacity / blockSize; 281 if (blockNumCapacity >= Integer.MAX_VALUE) { 282 // Enough for about 32TB of cache! 283 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); 284 } 285 286 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); 287 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); 288 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); 289 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); 290 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); 291 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); 292 this.queueAdditionWaitTime = 293 conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); 294 this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY); 295 this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000); 296 297 sanityCheckConfigs(); 298 299 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor 300 + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " 301 + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor 302 + ", useStrongRef: " + useStrongRef); 303 304 this.cacheCapacity = capacity; 305 this.persistencePath = persistencePath; 306 this.blockSize = blockSize; 307 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; 308 309 this.allocFailLogPrevTs = 0; 310 311 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 312 for (int i = 0; i < writerThreads.length; ++i) { 313 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); 314 } 315 316 assert writerQueues.size() == writerThreads.length; 317 this.ramCache = new RAMCache(); 318 319 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); 320 321 if (ioEngine.isPersistent() && persistencePath != null) { 322 startBucketCachePersisterThread(); 323 try { 324 retrieveFromFile(bucketSizes); 325 } catch (IOException ioex) { 326 LOG.error("Can't restore from file[" + persistencePath + "] because of ", ioex); 327 } 328 } 329 final String threadName = Thread.currentThread().getName(); 330 this.cacheEnabled = true; 331 for (int i = 0; i < writerThreads.length; ++i) { 332 writerThreads[i] = new WriterThread(writerQueues.get(i)); 333 writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); 334 writerThreads[i].setDaemon(true); 335 } 336 startWriterThreads(); 337 338 // Run the statistics thread periodically to print the cache statistics log 339 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 340 // every five minutes. 341 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, 342 statThreadPeriod, TimeUnit.SECONDS); 343 LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity=" 344 + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize) 345 + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" 346 + persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); 347 } 348 349 private void sanityCheckConfigs() { 350 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, 351 ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 352 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, 353 MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 354 Preconditions.checkArgument(minFactor <= acceptableFactor, 355 MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); 356 Preconditions.checkArgument(extraFreeFactor >= 0, 357 EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); 358 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, 359 SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 360 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, 361 MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 362 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, 363 MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 364 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, 365 SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and " 366 + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); 367 } 368 369 /** 370 * Called by the constructor to start the writer threads. Used by tests that need to override 371 * starting the threads. 372 */ 373 protected void startWriterThreads() { 374 for (WriterThread thread : writerThreads) { 375 thread.start(); 376 } 377 } 378 379 void startBucketCachePersisterThread() { 380 BucketCachePersister cachePersister = 381 new BucketCachePersister(this, bucketcachePersistInterval); 382 cachePersister.start(); 383 } 384 385 boolean isCacheEnabled() { 386 return this.cacheEnabled; 387 } 388 389 @Override 390 public long getMaxSize() { 391 return this.cacheCapacity; 392 } 393 394 public String getIoEngine() { 395 return ioEngine.toString(); 396 } 397 398 /** 399 * Get the IOEngine from the IO engine name 400 * @return the IOEngine 401 */ 402 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) 403 throws IOException { 404 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { 405 // In order to make the usage simple, we only need the prefix 'files:' in 406 // document whether one or multiple file(s), but also support 'file:' for 407 // the compatibility 408 String[] filePaths = 409 ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER); 410 return new FileIOEngine(capacity, persistencePath != null, filePaths); 411 } else if (ioEngineName.startsWith("offheap")) { 412 return new ByteBufferIOEngine(capacity); 413 } else if (ioEngineName.startsWith("mmap:")) { 414 return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 415 } else if (ioEngineName.startsWith("pmem:")) { 416 // This mode of bucket cache creates an IOEngine over a file on the persistent memory 417 // device. Since the persistent memory device has its own address space the contents 418 // mapped to this address space does not get swapped out like in the case of mmapping 419 // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache 420 // can be directly referred to without having to copy them onheap. Once the RPC is done, 421 // the blocks can be returned back as in case of ByteBufferIOEngine. 422 return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 423 } else { 424 throw new IllegalArgumentException( 425 "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); 426 } 427 } 428 429 /** 430 * Cache the block with the specified name and buffer. 431 * @param cacheKey block's cache key 432 * @param buf block buffer 433 */ 434 @Override 435 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 436 cacheBlock(cacheKey, buf, false); 437 } 438 439 /** 440 * Cache the block with the specified name and buffer. 441 * @param cacheKey block's cache key 442 * @param cachedItem block buffer 443 * @param inMemory if block is in-memory 444 */ 445 @Override 446 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { 447 cacheBlockWithWait(cacheKey, cachedItem, inMemory, false); 448 } 449 450 /** 451 * Cache the block with the specified name and buffer. 452 * @param cacheKey block's cache key 453 * @param cachedItem block buffer 454 * @param inMemory if block is in-memory 455 */ 456 @Override 457 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 458 boolean waitWhenCache) { 459 cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0); 460 } 461 462 /** 463 * Cache the block to ramCache 464 * @param cacheKey block's cache key 465 * @param cachedItem block buffer 466 * @param inMemory if block is in-memory 467 * @param wait if true, blocking wait when queue is full 468 */ 469 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 470 boolean wait) { 471 if (cacheEnabled) { 472 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { 473 if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) { 474 BucketEntry bucketEntry = backingMap.get(cacheKey); 475 if (bucketEntry != null && bucketEntry.isRpcRef()) { 476 // avoid replace when there are RPC refs for the bucket entry in bucket cache 477 return; 478 } 479 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 480 } 481 } else { 482 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 483 } 484 } 485 } 486 487 protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { 488 return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock); 489 } 490 491 protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 492 boolean inMemory, boolean wait) { 493 if (!cacheEnabled) { 494 return; 495 } 496 if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) { 497 cacheKey.setBlockType(cachedItem.getBlockType()); 498 } 499 LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); 500 // Stuff the entry into the RAM cache so it can get drained to the persistent store 501 RAMQueueEntry re = 502 new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); 503 /** 504 * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same 505 * key in ramCache, the heap size of bucket cache need to update if replacing entry from 506 * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if 507 * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct 508 * one, then the heap size will mess up (HBASE-20789) 509 */ 510 if (ramCache.putIfAbsent(cacheKey, re) != null) { 511 return; 512 } 513 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); 514 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); 515 boolean successfulAddition = false; 516 if (wait) { 517 try { 518 successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS); 519 } catch (InterruptedException e) { 520 Thread.currentThread().interrupt(); 521 } 522 } else { 523 successfulAddition = bq.offer(re); 524 } 525 if (!successfulAddition) { 526 ramCache.remove(cacheKey); 527 cacheStats.failInsert(); 528 } else { 529 this.blockNumber.increment(); 530 this.heapSize.add(cachedItem.heapSize()); 531 } 532 } 533 534 /** 535 * Get the buffer of the block with the specified key. 536 * @param key block's cache key 537 * @param caching true if the caller caches blocks on cache misses 538 * @param repeat Whether this is a repeat lookup for the same block 539 * @param updateCacheMetrics Whether we should update cache metrics or not 540 * @return buffer of specified cache key, or null if not in cache 541 */ 542 @Override 543 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 544 boolean updateCacheMetrics) { 545 if (!cacheEnabled) { 546 return null; 547 } 548 RAMQueueEntry re = ramCache.get(key); 549 if (re != null) { 550 if (updateCacheMetrics) { 551 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 552 } 553 re.access(accessCount.incrementAndGet()); 554 return re.getData(); 555 } 556 BucketEntry bucketEntry = backingMap.get(key); 557 if (bucketEntry != null) { 558 long start = System.nanoTime(); 559 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 560 try { 561 lock.readLock().lock(); 562 // We can not read here even if backingMap does contain the given key because its offset 563 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check 564 // existence here. 565 if (bucketEntry.equals(backingMap.get(key))) { 566 // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the 567 // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to 568 // the same BucketEntry, then all of the three will share the same refCnt. 569 Cacheable cachedBlock = ioEngine.read(bucketEntry); 570 if (ioEngine.usesSharedMemory()) { 571 // If IOEngine use shared memory, cachedBlock and BucketEntry will share the 572 // same RefCnt, do retain here, in order to count the number of RPC references 573 cachedBlock.retain(); 574 } 575 // Update the cache statistics. 576 if (updateCacheMetrics) { 577 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 578 cacheStats.ioHit(System.nanoTime() - start); 579 } 580 bucketEntry.access(accessCount.incrementAndGet()); 581 if (this.ioErrorStartTime > 0) { 582 ioErrorStartTime = -1; 583 } 584 return cachedBlock; 585 } 586 } catch (IOException ioex) { 587 LOG.error("Failed reading block " + key + " from bucket cache", ioex); 588 checkIOErrorIsTolerated(); 589 } finally { 590 lock.readLock().unlock(); 591 } 592 } 593 if (!repeat && updateCacheMetrics) { 594 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 595 } 596 return null; 597 } 598 599 /** 600 * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap} 601 */ 602 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 603 boolean evictedByEvictionProcess) { 604 bucketEntry.markAsEvicted(); 605 blocksByHFile.remove(cacheKey); 606 if (decrementBlockNumber) { 607 this.blockNumber.decrement(); 608 } 609 if (evictedByEvictionProcess) { 610 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); 611 } 612 if (ioEngine.isPersistent()) { 613 setCacheInconsistent(true); 614 } 615 } 616 617 /** 618 * Free the {{@link BucketEntry} actually,which could only be invoked when the 619 * {@link BucketEntry#refCnt} becoming 0. 620 */ 621 void freeBucketEntry(BucketEntry bucketEntry) { 622 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 623 realCacheSize.add(-1 * bucketEntry.getLength()); 624 } 625 626 /** 627 * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br> 628 * 1. Close an HFile, and clear all cached blocks. <br> 629 * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br> 630 * <p> 631 * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap. 632 * Here we evict the block from backingMap immediately, but only free the reference from bucket 633 * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this 634 * block, block can only be de-allocated when all of them release the block. 635 * <p> 636 * NOTICE: we need to grab the write offset lock firstly before releasing the reference from 637 * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when 638 * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak. 639 * @param cacheKey Block to evict 640 * @return true to indicate whether we've evicted successfully or not. 641 */ 642 @Override 643 public boolean evictBlock(BlockCacheKey cacheKey) { 644 return doEvictBlock(cacheKey, null, false); 645 } 646 647 /** 648 * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and 649 * {@link BucketCache#ramCache}. <br/> 650 * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 651 * {@link BucketEntry} could be removed. 652 * @param cacheKey {@link BlockCacheKey} to evict. 653 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 654 * @return true to indicate whether we've evicted successfully or not. 655 */ 656 private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, 657 boolean evictedByEvictionProcess) { 658 if (!cacheEnabled) { 659 return false; 660 } 661 boolean existedInRamCache = removeFromRamCache(cacheKey); 662 if (bucketEntry == null) { 663 bucketEntry = backingMap.get(cacheKey); 664 } 665 final BucketEntry bucketEntryToUse = bucketEntry; 666 667 if (bucketEntryToUse == null) { 668 if (existedInRamCache && evictedByEvictionProcess) { 669 cacheStats.evicted(0, cacheKey.isPrimary()); 670 } 671 return existedInRamCache; 672 } else { 673 return bucketEntryToUse.withWriteLock(offsetLock, () -> { 674 if (backingMap.remove(cacheKey, bucketEntryToUse)) { 675 blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess); 676 return true; 677 } 678 return false; 679 }); 680 } 681 } 682 683 /** 684 * <pre> 685 * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as 686 * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}. 687 * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf} 688 * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different: 689 * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap}, 690 * it is the return value of current {@link BucketCache#createRecycler} method. 691 * 692 * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache}, 693 * it is {@link ByteBuffAllocator#putbackBuffer}. 694 * </pre> 695 */ 696 private Recycler createRecycler(final BucketEntry bucketEntry) { 697 return () -> { 698 freeBucketEntry(bucketEntry); 699 return; 700 }; 701 } 702 703 /** 704 * NOTE: This method is only for test. 705 */ 706 public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) { 707 BucketEntry bucketEntry = backingMap.get(blockCacheKey); 708 if (bucketEntry == null) { 709 return false; 710 } 711 return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry); 712 } 713 714 /** 715 * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if 716 * {@link BucketEntry#isRpcRef} is false. <br/> 717 * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 718 * {@link BucketEntry} could be removed. 719 * @param blockCacheKey {@link BlockCacheKey} to evict. 720 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 721 * @return true to indicate whether we've evicted successfully or not. 722 */ 723 boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) { 724 if (!bucketEntry.isRpcRef()) { 725 return doEvictBlock(blockCacheKey, bucketEntry, true); 726 } 727 return false; 728 } 729 730 protected boolean removeFromRamCache(BlockCacheKey cacheKey) { 731 return ramCache.remove(cacheKey, re -> { 732 if (re != null) { 733 this.blockNumber.decrement(); 734 this.heapSize.add(-1 * re.getData().heapSize()); 735 } 736 }); 737 } 738 739 public boolean isCacheInconsistent() { 740 return isCacheInconsistent.get(); 741 } 742 743 public void setCacheInconsistent(boolean setCacheInconsistent) { 744 isCacheInconsistent.set(setCacheInconsistent); 745 } 746 747 /* 748 * Statistics thread. Periodically output cache statistics to the log. 749 */ 750 private static class StatisticsThread extends Thread { 751 private final BucketCache bucketCache; 752 753 public StatisticsThread(BucketCache bucketCache) { 754 super("BucketCacheStatsThread"); 755 setDaemon(true); 756 this.bucketCache = bucketCache; 757 } 758 759 @Override 760 public void run() { 761 bucketCache.logStats(); 762 } 763 } 764 765 public void logStats() { 766 long totalSize = bucketAllocator.getTotalSize(); 767 long usedSize = bucketAllocator.getUsedSize(); 768 long freeSize = totalSize - usedSize; 769 long cacheSize = getRealCacheSize(); 770 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize=" 771 + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", " 772 + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize=" 773 + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", " 774 + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond=" 775 + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit=" 776 + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio=" 777 + (cacheStats.getHitCount() == 0 778 ? "0," 779 : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", ")) 780 + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits=" 781 + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio=" 782 + (cacheStats.getHitCachingCount() == 0 783 ? "0," 784 : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", ")) 785 + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted=" 786 + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction() 787 + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount()); 788 cacheStats.reset(); 789 790 bucketAllocator.logDebugStatistics(); 791 } 792 793 public long getRealCacheSize() { 794 return this.realCacheSize.sum(); 795 } 796 797 public long acceptableSize() { 798 return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); 799 } 800 801 long getPartitionSize(float partitionFactor) { 802 return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); 803 } 804 805 /** 806 * Return the count of bucketSizeinfos still need free space 807 */ 808 private int bucketSizesAboveThresholdCount(float minFactor) { 809 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 810 int fullCount = 0; 811 for (int i = 0; i < stats.length; i++) { 812 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 813 freeGoal = Math.max(freeGoal, 1); 814 if (stats[i].freeCount() < freeGoal) { 815 fullCount++; 816 } 817 } 818 return fullCount; 819 } 820 821 /** 822 * This method will find the buckets that are minimally occupied and are not reference counted and 823 * will free them completely without any constraint on the access times of the elements, and as a 824 * process will completely free at most the number of buckets passed, sometimes it might not due 825 * to changing refCounts 826 * @param completelyFreeBucketsNeeded number of buckets to free 827 **/ 828 private void freeEntireBuckets(int completelyFreeBucketsNeeded) { 829 if (completelyFreeBucketsNeeded != 0) { 830 // First we will build a set where the offsets are reference counted, usually 831 // this set is small around O(Handler Count) unless something else is wrong 832 Set<Integer> inUseBuckets = new HashSet<>(); 833 backingMap.forEach((k, be) -> { 834 if (be.isRpcRef()) { 835 inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset())); 836 } 837 }); 838 Set<Integer> candidateBuckets = 839 bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded); 840 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) { 841 if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) { 842 evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue()); 843 } 844 } 845 } 846 } 847 848 /** 849 * Free the space if the used size reaches acceptableSize() or one size block couldn't be 850 * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some 851 * blocks evicted 852 * @param why Why we are being called 853 */ 854 void freeSpace(final String why) { 855 // Ensure only one freeSpace progress at a time 856 if (!freeSpaceLock.tryLock()) { 857 return; 858 } 859 try { 860 freeInProgress = true; 861 long bytesToFreeWithoutExtra = 0; 862 // Calculate free byte for each bucketSizeinfo 863 StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null; 864 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 865 long[] bytesToFreeForBucket = new long[stats.length]; 866 for (int i = 0; i < stats.length; i++) { 867 bytesToFreeForBucket[i] = 0; 868 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 869 freeGoal = Math.max(freeGoal, 1); 870 if (stats[i].freeCount() < freeGoal) { 871 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); 872 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; 873 if (msgBuffer != null) { 874 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" 875 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); 876 } 877 } 878 } 879 if (msgBuffer != null) { 880 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); 881 } 882 883 if (bytesToFreeWithoutExtra <= 0) { 884 return; 885 } 886 long currentSize = bucketAllocator.getUsedSize(); 887 long totalSize = bucketAllocator.getTotalSize(); 888 if (LOG.isDebugEnabled() && msgBuffer != null) { 889 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() 890 + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" 891 + StringUtils.byteDesc(realCacheSize.sum()) + ", total=" 892 + StringUtils.byteDesc(totalSize)); 893 } 894 895 long bytesToFreeWithExtra = 896 (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor)); 897 898 // Instantiate priority buckets 899 BucketEntryGroup bucketSingle = 900 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor)); 901 BucketEntryGroup bucketMulti = 902 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor)); 903 BucketEntryGroup bucketMemory = 904 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor)); 905 906 // Scan entire map putting bucket entry into appropriate bucket entry 907 // group 908 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { 909 switch (bucketEntryWithKey.getValue().getPriority()) { 910 case SINGLE: { 911 bucketSingle.add(bucketEntryWithKey); 912 break; 913 } 914 case MULTI: { 915 bucketMulti.add(bucketEntryWithKey); 916 break; 917 } 918 case MEMORY: { 919 bucketMemory.add(bucketEntryWithKey); 920 break; 921 } 922 } 923 } 924 925 PriorityQueue<BucketEntryGroup> bucketQueue = 926 new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow)); 927 928 bucketQueue.add(bucketSingle); 929 bucketQueue.add(bucketMulti); 930 bucketQueue.add(bucketMemory); 931 932 int remainingBuckets = bucketQueue.size(); 933 long bytesFreed = 0; 934 935 BucketEntryGroup bucketGroup; 936 while ((bucketGroup = bucketQueue.poll()) != null) { 937 long overflow = bucketGroup.overflow(); 938 if (overflow > 0) { 939 long bucketBytesToFree = 940 Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); 941 bytesFreed += bucketGroup.free(bucketBytesToFree); 942 } 943 remainingBuckets--; 944 } 945 946 // Check and free if there are buckets that still need freeing of space 947 if (bucketSizesAboveThresholdCount(minFactor) > 0) { 948 bucketQueue.clear(); 949 remainingBuckets = 3; 950 951 bucketQueue.add(bucketSingle); 952 bucketQueue.add(bucketMulti); 953 bucketQueue.add(bucketMemory); 954 955 while ((bucketGroup = bucketQueue.poll()) != null) { 956 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; 957 bytesFreed += bucketGroup.free(bucketBytesToFree); 958 remainingBuckets--; 959 } 960 } 961 962 // Even after the above free we might still need freeing because of the 963 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e 964 // there might be some buckets where the occupancy is very sparse and thus are not 965 // yielding the free for the other bucket sizes, the fix for this to evict some 966 // of the buckets, we do this by evicting the buckets that are least fulled 967 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f)); 968 969 if (LOG.isDebugEnabled()) { 970 long single = bucketSingle.totalSize(); 971 long multi = bucketMulti.totalSize(); 972 long memory = bucketMemory.totalSize(); 973 if (LOG.isDebugEnabled()) { 974 LOG.debug("Bucket cache free space completed; " + "freed=" 975 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize) 976 + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" 977 + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); 978 } 979 } 980 981 } catch (Throwable t) { 982 LOG.warn("Failed freeing space", t); 983 } finally { 984 cacheStats.evict(); 985 freeInProgress = false; 986 freeSpaceLock.unlock(); 987 } 988 } 989 990 // This handles flushing the RAM cache to IOEngine. 991 class WriterThread extends Thread { 992 private final BlockingQueue<RAMQueueEntry> inputQueue; 993 private volatile boolean writerEnabled = true; 994 private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE); 995 996 WriterThread(BlockingQueue<RAMQueueEntry> queue) { 997 super("BucketCacheWriterThread"); 998 this.inputQueue = queue; 999 } 1000 1001 // Used for test 1002 void disableWriter() { 1003 this.writerEnabled = false; 1004 } 1005 1006 @Override 1007 public void run() { 1008 List<RAMQueueEntry> entries = new ArrayList<>(); 1009 try { 1010 while (cacheEnabled && writerEnabled) { 1011 try { 1012 try { 1013 // Blocks 1014 entries = getRAMQueueEntries(inputQueue, entries); 1015 } catch (InterruptedException ie) { 1016 if (!cacheEnabled || !writerEnabled) { 1017 break; 1018 } 1019 } 1020 doDrain(entries, metaBuff); 1021 } catch (Exception ioe) { 1022 LOG.error("WriterThread encountered error", ioe); 1023 } 1024 } 1025 } catch (Throwable t) { 1026 LOG.warn("Failed doing drain", t); 1027 } 1028 LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); 1029 } 1030 } 1031 1032 /** 1033 * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing 1034 * cache with a new block for the same cache key. there's a corner case: one thread cache a block 1035 * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block 1036 * with the same cache key do the same thing for the same cache key, so if not evict the previous 1037 * bucket entry, then memory leak happen because the previous bucketEntry is gone but the 1038 * bucketAllocator do not free its memory. 1039 * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey 1040 * cacheKey, Cacheable newBlock) 1041 * @param key Block cache key 1042 * @param bucketEntry Bucket entry to put into backingMap. 1043 */ 1044 protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 1045 BucketEntry previousEntry = backingMap.put(key, bucketEntry); 1046 if (previousEntry != null && previousEntry != bucketEntry) { 1047 previousEntry.withWriteLock(offsetLock, () -> { 1048 blockEvicted(key, previousEntry, false, false); 1049 return null; 1050 }); 1051 } 1052 } 1053 1054 /** 1055 * Prepare and return a warning message for Bucket Allocator Exception 1056 * @param fle The exception 1057 * @param re The RAMQueueEntry for which the exception was thrown. 1058 * @return A warning message created from the input RAMQueueEntry object. 1059 */ 1060 private static String getAllocationFailWarningMessage(final BucketAllocatorException fle, 1061 final RAMQueueEntry re) { 1062 final StringBuilder sb = new StringBuilder(); 1063 sb.append("Most recent failed allocation after "); 1064 sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD); 1065 sb.append(" ms;"); 1066 if (re != null) { 1067 if (re.getData() instanceof HFileBlock) { 1068 final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext(); 1069 final String columnFamily = Bytes.toString(fileContext.getColumnFamily()); 1070 final String tableName = Bytes.toString(fileContext.getTableName()); 1071 if (tableName != null) { 1072 sb.append(" Table: "); 1073 sb.append(tableName); 1074 } 1075 if (columnFamily != null) { 1076 sb.append(" CF: "); 1077 sb.append(columnFamily); 1078 } 1079 sb.append(" HFile: "); 1080 if (fileContext.getHFileName() != null) { 1081 sb.append(fileContext.getHFileName()); 1082 } else { 1083 sb.append(re.getKey()); 1084 } 1085 } else { 1086 sb.append(" HFile: "); 1087 sb.append(re.getKey()); 1088 } 1089 } 1090 sb.append(" Message: "); 1091 sb.append(fle.getMessage()); 1092 return sb.toString(); 1093 } 1094 1095 /** 1096 * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that 1097 * are passed in even if failure being sure to remove from ramCache else we'll never undo the 1098 * references and we'll OOME. 1099 * @param entries Presumes list passed in here will be processed by this invocation only. No 1100 * interference expected. 1101 */ 1102 void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException { 1103 if (entries.isEmpty()) { 1104 return; 1105 } 1106 // This method is a little hard to follow. We run through the passed in entries and for each 1107 // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must 1108 // do cleanup making sure we've cleared ramCache of all entries regardless of whether we 1109 // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by 1110 // filling ramCache. We do the clean up by again running through the passed in entries 1111 // doing extra work when we find a non-null bucketEntries corresponding entry. 1112 final int size = entries.size(); 1113 BucketEntry[] bucketEntries = new BucketEntry[size]; 1114 // Index updated inside loop if success or if we can't succeed. We retry if cache is full 1115 // when we go to add an entry by going around the loop again without upping the index. 1116 int index = 0; 1117 while (cacheEnabled && index < size) { 1118 RAMQueueEntry re = null; 1119 try { 1120 re = entries.get(index); 1121 if (re == null) { 1122 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); 1123 index++; 1124 continue; 1125 } 1126 BlockCacheKey cacheKey = re.getKey(); 1127 if (ramCache.containsKey(cacheKey)) { 1128 blocksByHFile.add(cacheKey); 1129 } 1130 // Reset the position for reuse. 1131 // It should be guaranteed that the data in the metaBuff has been transferred to the 1132 // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already 1133 // transferred with our current IOEngines. Should take care, when we have new kinds of 1134 // IOEngine in the future. 1135 metaBuff.clear(); 1136 BucketEntry bucketEntry = 1137 re.writeToCache(ioEngine, bucketAllocator, realCacheSize, this::createRecycler, metaBuff); 1138 // Successfully added. Up index and add bucketEntry. Clear io exceptions. 1139 bucketEntries[index] = bucketEntry; 1140 if (ioErrorStartTime > 0) { 1141 ioErrorStartTime = -1; 1142 } 1143 index++; 1144 } catch (BucketAllocatorException fle) { 1145 long currTs = EnvironmentEdgeManager.currentTime(); 1146 cacheStats.allocationFailed(); // Record the warning. 1147 if ( 1148 allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD 1149 ) { 1150 LOG.warn(getAllocationFailWarningMessage(fle, re)); 1151 allocFailLogPrevTs = currTs; 1152 } 1153 // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. 1154 bucketEntries[index] = null; 1155 index++; 1156 } catch (CacheFullException cfe) { 1157 // Cache full when we tried to add. Try freeing space and then retrying (don't up index) 1158 if (!freeInProgress) { 1159 freeSpace("Full!"); 1160 } else { 1161 Thread.sleep(50); 1162 } 1163 } catch (IOException ioex) { 1164 // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. 1165 LOG.error("Failed writing to bucket cache", ioex); 1166 checkIOErrorIsTolerated(); 1167 } 1168 } 1169 1170 // Make sure data pages are written on media before we update maps. 1171 try { 1172 ioEngine.sync(); 1173 } catch (IOException ioex) { 1174 LOG.error("Failed syncing IO engine", ioex); 1175 checkIOErrorIsTolerated(); 1176 // Since we failed sync, free the blocks in bucket allocator 1177 for (int i = 0; i < entries.size(); ++i) { 1178 BucketEntry bucketEntry = bucketEntries[i]; 1179 if (bucketEntry != null) { 1180 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 1181 bucketEntries[i] = null; 1182 } 1183 } 1184 } 1185 1186 // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if 1187 // success or error. 1188 for (int i = 0; i < size; ++i) { 1189 BlockCacheKey key = entries.get(i).getKey(); 1190 // Only add if non-null entry. 1191 if (bucketEntries[i] != null) { 1192 putIntoBackingMap(key, bucketEntries[i]); 1193 if (ioEngine.isPersistent()) { 1194 setCacheInconsistent(true); 1195 } 1196 } 1197 // Always remove from ramCache even if we failed adding it to the block cache above. 1198 boolean existed = ramCache.remove(key, re -> { 1199 if (re != null) { 1200 heapSize.add(-1 * re.getData().heapSize()); 1201 } 1202 }); 1203 if (!existed && bucketEntries[i] != null) { 1204 // Block should have already been evicted. Remove it and free space. 1205 final BucketEntry bucketEntry = bucketEntries[i]; 1206 bucketEntry.withWriteLock(offsetLock, () -> { 1207 if (backingMap.remove(key, bucketEntry)) { 1208 blockEvicted(key, bucketEntry, false, false); 1209 } 1210 return null; 1211 }); 1212 } 1213 } 1214 1215 long used = bucketAllocator.getUsedSize(); 1216 if (used > acceptableSize()) { 1217 freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); 1218 } 1219 return; 1220 } 1221 1222 /** 1223 * Blocks until elements available in {@code q} then tries to grab as many as possible before 1224 * returning. 1225 * @param receptacle Where to stash the elements taken from queue. We clear before we use it just 1226 * in case. 1227 * @param q The queue to take from. 1228 * @return {@code receptacle} laden with elements taken from the queue or empty if none found. 1229 */ 1230 static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q, 1231 List<RAMQueueEntry> receptacle) throws InterruptedException { 1232 // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it 1233 // ok even if list grew to accommodate thousands. 1234 receptacle.clear(); 1235 receptacle.add(q.take()); 1236 q.drainTo(receptacle); 1237 return receptacle; 1238 } 1239 1240 /** 1241 * @see #retrieveFromFile(int[]) 1242 */ 1243 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", 1244 justification = "false positive, try-with-resources ensures close is called.") 1245 void persistToFile() throws IOException { 1246 if (!ioEngine.isPersistent()) { 1247 throw new IOException("Attempt to persist non-persistent cache mappings!"); 1248 } 1249 try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) { 1250 fos.write(ProtobufMagic.PB_MAGIC); 1251 BucketProtoUtils.toPB(this).writeDelimitedTo(fos); 1252 } 1253 if (prefetchedFileListPath != null) { 1254 PrefetchExecutor.persistToFile(prefetchedFileListPath); 1255 } 1256 } 1257 1258 /** 1259 * @see #persistToFile() 1260 */ 1261 private void retrieveFromFile(int[] bucketSizes) throws IOException { 1262 File persistenceFile = new File(persistencePath); 1263 if (!persistenceFile.exists()) { 1264 return; 1265 } 1266 assert !cacheEnabled; 1267 if (prefetchedFileListPath != null) { 1268 PrefetchExecutor.retrieveFromFile(prefetchedFileListPath); 1269 } 1270 1271 try (FileInputStream in = deleteFileOnClose(persistenceFile)) { 1272 int pblen = ProtobufMagic.lengthOfPBMagic(); 1273 byte[] pbuf = new byte[pblen]; 1274 int read = in.read(pbuf); 1275 if (read != pblen) { 1276 throw new IOException("Incorrect number of bytes read while checking for protobuf magic " 1277 + "number. Requested=" + pblen + ", Received= " + read + ", File=" + persistencePath); 1278 } 1279 if (!ProtobufMagic.isPBMagicPrefix(pbuf)) { 1280 // In 3.0 we have enough flexibility to dump the old cache data. 1281 // TODO: In 2.x line, this might need to be filled in to support reading the old format 1282 throw new IOException( 1283 "Persistence file does not start with protobuf magic number. " + persistencePath); 1284 } 1285 parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); 1286 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1287 blockNumber.add(backingMap.size()); 1288 } 1289 } 1290 1291 /** 1292 * Create an input stream that deletes the file after reading it. Use in try-with-resources to 1293 * avoid this pattern where an exception thrown from a finally block may mask earlier exceptions: 1294 * 1295 * <pre> 1296 * File f = ... 1297 * try (FileInputStream fis = new FileInputStream(f)) { 1298 * // use the input stream 1299 * } finally { 1300 * if (!f.delete()) throw new IOException("failed to delete"); 1301 * } 1302 * </pre> 1303 * 1304 * @param file the file to read and delete 1305 * @return a FileInputStream for the given file 1306 * @throws IOException if there is a problem creating the stream 1307 */ 1308 private FileInputStream deleteFileOnClose(final File file) throws IOException { 1309 return new FileInputStream(file) { 1310 private File myFile; 1311 1312 private FileInputStream init(File file) { 1313 myFile = file; 1314 return this; 1315 } 1316 1317 @Override 1318 public void close() throws IOException { 1319 // close() will be called during try-with-resources and it will be 1320 // called by finalizer thread during GC. To avoid double-free resource, 1321 // set myFile to null after the first call. 1322 if (myFile == null) { 1323 return; 1324 } 1325 1326 super.close(); 1327 if (!myFile.delete()) { 1328 throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath()); 1329 } 1330 myFile = null; 1331 } 1332 }.init(file); 1333 } 1334 1335 private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) 1336 throws IOException { 1337 if (capacitySize != cacheCapacity) { 1338 throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize) 1339 + ", expected: " + StringUtils.byteDesc(cacheCapacity)); 1340 } 1341 if (!ioEngine.getClass().getName().equals(ioclass)) { 1342 throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:" 1343 + ioEngine.getClass().getName()); 1344 } 1345 if (!backingMap.getClass().getName().equals(mapclass)) { 1346 throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:" 1347 + backingMap.getClass().getName()); 1348 } 1349 } 1350 1351 private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { 1352 if (proto.hasChecksum()) { 1353 ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), 1354 algorithm); 1355 } else { 1356 // if has not checksum, it means the persistence file is old format 1357 LOG.info("Persistent file is old format, it does not support verifying file integrity!"); 1358 } 1359 verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); 1360 backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), 1361 this::createRecycler); 1362 } 1363 1364 /** 1365 * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors 1366 * exceeds ioErrorsDurationTimeTolerated, we will disable the cache 1367 */ 1368 private void checkIOErrorIsTolerated() { 1369 long now = EnvironmentEdgeManager.currentTime(); 1370 // Do a single read to a local variable to avoid timing issue - HBASE-24454 1371 long ioErrorStartTimeTmp = this.ioErrorStartTime; 1372 if (ioErrorStartTimeTmp > 0) { 1373 if (cacheEnabled && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { 1374 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration 1375 + "ms, disabling cache, please check your IOEngine"); 1376 disableCache(); 1377 } 1378 } else { 1379 this.ioErrorStartTime = now; 1380 } 1381 } 1382 1383 /** 1384 * Used to shut down the cache -or- turn it off in the case of something broken. 1385 */ 1386 private void disableCache() { 1387 if (!cacheEnabled) return; 1388 cacheEnabled = false; 1389 ioEngine.shutdown(); 1390 this.scheduleThreadPool.shutdown(); 1391 for (int i = 0; i < writerThreads.length; ++i) 1392 writerThreads[i].interrupt(); 1393 this.ramCache.clear(); 1394 if (!ioEngine.isPersistent() || persistencePath == null) { 1395 // If persistent ioengine and a path, we will serialize out the backingMap. 1396 this.backingMap.clear(); 1397 } 1398 } 1399 1400 private void join() throws InterruptedException { 1401 for (int i = 0; i < writerThreads.length; ++i) 1402 writerThreads[i].join(); 1403 } 1404 1405 @Override 1406 public void shutdown() { 1407 disableCache(); 1408 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write=" 1409 + persistencePath); 1410 if (ioEngine.isPersistent() && persistencePath != null) { 1411 try { 1412 join(); 1413 persistToFile(); 1414 } catch (IOException ex) { 1415 LOG.error("Unable to persist data on exit: " + ex.toString(), ex); 1416 } catch (InterruptedException e) { 1417 LOG.warn("Failed to persist data on exit", e); 1418 } 1419 } 1420 } 1421 1422 @Override 1423 public CacheStats getStats() { 1424 return cacheStats; 1425 } 1426 1427 public BucketAllocator getAllocator() { 1428 return this.bucketAllocator; 1429 } 1430 1431 @Override 1432 public long heapSize() { 1433 return this.heapSize.sum(); 1434 } 1435 1436 @Override 1437 public long size() { 1438 return this.realCacheSize.sum(); 1439 } 1440 1441 @Override 1442 public long getCurrentDataSize() { 1443 return size(); 1444 } 1445 1446 @Override 1447 public long getFreeSize() { 1448 return this.bucketAllocator.getFreeSize(); 1449 } 1450 1451 @Override 1452 public long getBlockCount() { 1453 return this.blockNumber.sum(); 1454 } 1455 1456 @Override 1457 public long getDataBlockCount() { 1458 return getBlockCount(); 1459 } 1460 1461 @Override 1462 public long getCurrentSize() { 1463 return this.bucketAllocator.getUsedSize(); 1464 } 1465 1466 protected String getAlgorithm() { 1467 return algorithm; 1468 } 1469 1470 /** 1471 * Evicts all blocks for a specific HFile. 1472 * <p> 1473 * This is used for evict-on-close to remove all blocks of a specific HFile. 1474 * @return the number of blocks evicted 1475 */ 1476 @Override 1477 public int evictBlocksByHfileName(String hfileName) { 1478 PrefetchExecutor.removePrefetchedFileWhileEvict(hfileName); 1479 Set<BlockCacheKey> keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), 1480 true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true); 1481 1482 int numEvicted = 0; 1483 for (BlockCacheKey key : keySet) { 1484 if (evictBlock(key)) { 1485 ++numEvicted; 1486 } 1487 } 1488 1489 return numEvicted; 1490 } 1491 1492 /** 1493 * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each 1494 * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate 1495 * number of elements out of each according to configuration parameters and their relative sizes. 1496 */ 1497 private class BucketEntryGroup { 1498 1499 private CachedEntryQueue queue; 1500 private long totalSize = 0; 1501 private long bucketSize; 1502 1503 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { 1504 this.bucketSize = bucketSize; 1505 queue = new CachedEntryQueue(bytesToFree, blockSize); 1506 totalSize = 0; 1507 } 1508 1509 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) { 1510 totalSize += block.getValue().getLength(); 1511 queue.add(block); 1512 } 1513 1514 public long free(long toFree) { 1515 Map.Entry<BlockCacheKey, BucketEntry> entry; 1516 long freedBytes = 0; 1517 // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free 1518 // What to do then? Caching attempt fail? Need some changes in cacheBlock API? 1519 while ((entry = queue.pollLast()) != null) { 1520 BlockCacheKey blockCacheKey = entry.getKey(); 1521 BucketEntry be = entry.getValue(); 1522 if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) { 1523 freedBytes += be.getLength(); 1524 } 1525 if (freedBytes >= toFree) { 1526 return freedBytes; 1527 } 1528 } 1529 return freedBytes; 1530 } 1531 1532 public long overflow() { 1533 return totalSize - bucketSize; 1534 } 1535 1536 public long totalSize() { 1537 return totalSize; 1538 } 1539 } 1540 1541 /** 1542 * Block Entry stored in the memory with key,data and so on 1543 */ 1544 static class RAMQueueEntry { 1545 private final BlockCacheKey key; 1546 private final Cacheable data; 1547 private long accessCounter; 1548 private boolean inMemory; 1549 1550 RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory) { 1551 this.key = bck; 1552 this.data = data; 1553 this.accessCounter = accessCounter; 1554 this.inMemory = inMemory; 1555 } 1556 1557 public Cacheable getData() { 1558 return data; 1559 } 1560 1561 public BlockCacheKey getKey() { 1562 return key; 1563 } 1564 1565 public void access(long accessCounter) { 1566 this.accessCounter = accessCounter; 1567 } 1568 1569 private ByteBuffAllocator getByteBuffAllocator() { 1570 if (data instanceof HFileBlock) { 1571 return ((HFileBlock) data).getByteBuffAllocator(); 1572 } 1573 return ByteBuffAllocator.HEAP; 1574 } 1575 1576 public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, 1577 final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler, 1578 ByteBuffer metaBuff) throws IOException { 1579 int len = data.getSerializedLength(); 1580 // This cacheable thing can't be serialized 1581 if (len == 0) { 1582 return null; 1583 } 1584 long offset = alloc.allocateBlock(len); 1585 boolean succ = false; 1586 BucketEntry bucketEntry = null; 1587 try { 1588 bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, createRecycler, 1589 getByteBuffAllocator()); 1590 bucketEntry.setDeserializerReference(data.getDeserializer()); 1591 if (data instanceof HFileBlock) { 1592 // If an instance of HFileBlock, save on some allocations. 1593 HFileBlock block = (HFileBlock) data; 1594 ByteBuff sliceBuf = block.getBufferReadOnly(); 1595 block.getMetaData(metaBuff); 1596 ioEngine.write(sliceBuf, offset); 1597 ioEngine.write(metaBuff, offset + len - metaBuff.limit()); 1598 } else { 1599 // Only used for testing. 1600 ByteBuffer bb = ByteBuffer.allocate(len); 1601 data.serialize(bb, true); 1602 ioEngine.write(bb, offset); 1603 } 1604 succ = true; 1605 } finally { 1606 if (!succ) { 1607 alloc.freeBlock(offset, len); 1608 } 1609 } 1610 realCacheSize.add(len); 1611 return bucketEntry; 1612 } 1613 } 1614 1615 /** 1616 * Only used in test 1617 */ 1618 void stopWriterThreads() throws InterruptedException { 1619 for (WriterThread writerThread : writerThreads) { 1620 writerThread.disableWriter(); 1621 writerThread.interrupt(); 1622 writerThread.join(); 1623 } 1624 } 1625 1626 @Override 1627 public Iterator<CachedBlock> iterator() { 1628 // Don't bother with ramcache since stuff is in here only a little while. 1629 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator(); 1630 return new Iterator<CachedBlock>() { 1631 private final long now = System.nanoTime(); 1632 1633 @Override 1634 public boolean hasNext() { 1635 return i.hasNext(); 1636 } 1637 1638 @Override 1639 public CachedBlock next() { 1640 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next(); 1641 return new CachedBlock() { 1642 @Override 1643 public String toString() { 1644 return BlockCacheUtil.toString(this, now); 1645 } 1646 1647 @Override 1648 public BlockPriority getBlockPriority() { 1649 return e.getValue().getPriority(); 1650 } 1651 1652 @Override 1653 public BlockType getBlockType() { 1654 // Not held by BucketEntry. Could add it if wanted on BucketEntry creation. 1655 return null; 1656 } 1657 1658 @Override 1659 public long getOffset() { 1660 return e.getKey().getOffset(); 1661 } 1662 1663 @Override 1664 public long getSize() { 1665 return e.getValue().getLength(); 1666 } 1667 1668 @Override 1669 public long getCachedTime() { 1670 return e.getValue().getCachedTime(); 1671 } 1672 1673 @Override 1674 public String getFilename() { 1675 return e.getKey().getHfileName(); 1676 } 1677 1678 @Override 1679 public int compareTo(CachedBlock other) { 1680 int diff = this.getFilename().compareTo(other.getFilename()); 1681 if (diff != 0) return diff; 1682 1683 diff = Long.compare(this.getOffset(), other.getOffset()); 1684 if (diff != 0) return diff; 1685 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1686 throw new IllegalStateException( 1687 "" + this.getCachedTime() + ", " + other.getCachedTime()); 1688 } 1689 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1690 } 1691 1692 @Override 1693 public int hashCode() { 1694 return e.getKey().hashCode(); 1695 } 1696 1697 @Override 1698 public boolean equals(Object obj) { 1699 if (obj instanceof CachedBlock) { 1700 CachedBlock cb = (CachedBlock) obj; 1701 return compareTo(cb) == 0; 1702 } else { 1703 return false; 1704 } 1705 } 1706 }; 1707 } 1708 1709 @Override 1710 public void remove() { 1711 throw new UnsupportedOperationException(); 1712 } 1713 }; 1714 } 1715 1716 @Override 1717 public BlockCache[] getBlockCaches() { 1718 return null; 1719 } 1720 1721 public int getRpcRefCount(BlockCacheKey cacheKey) { 1722 BucketEntry bucketEntry = backingMap.get(cacheKey); 1723 if (bucketEntry != null) { 1724 return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1); 1725 } 1726 return 0; 1727 } 1728 1729 float getAcceptableFactor() { 1730 return acceptableFactor; 1731 } 1732 1733 float getMinFactor() { 1734 return minFactor; 1735 } 1736 1737 float getExtraFreeFactor() { 1738 return extraFreeFactor; 1739 } 1740 1741 float getSingleFactor() { 1742 return singleFactor; 1743 } 1744 1745 float getMultiFactor() { 1746 return multiFactor; 1747 } 1748 1749 float getMemoryFactor() { 1750 return memoryFactor; 1751 } 1752 1753 /** 1754 * Wrapped the delegate ConcurrentMap with maintaining its block's reference count. 1755 */ 1756 static class RAMCache { 1757 /** 1758 * Defined the map as {@link ConcurrentHashMap} explicitly here, because in 1759 * {@link RAMCache#get(BlockCacheKey)} and 1760 * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee 1761 * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the 1762 * func method can execute exactly once only when the key is present(or absent) and under the 1763 * lock context. Otherwise, the reference count of block will be messed up. Notice that the 1764 * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 1765 */ 1766 final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>(); 1767 1768 public boolean containsKey(BlockCacheKey key) { 1769 return delegate.containsKey(key); 1770 } 1771 1772 public RAMQueueEntry get(BlockCacheKey key) { 1773 return delegate.computeIfPresent(key, (k, re) -> { 1774 // It'll be referenced by RPC, so retain atomically here. if the get and retain is not 1775 // atomic, another thread may remove and release the block, when retaining in this thread we 1776 // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422) 1777 re.getData().retain(); 1778 return re; 1779 }); 1780 } 1781 1782 /** 1783 * Return the previous associated value, or null if absent. It has the same meaning as 1784 * {@link ConcurrentMap#putIfAbsent(Object, Object)} 1785 */ 1786 public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) { 1787 AtomicBoolean absent = new AtomicBoolean(false); 1788 RAMQueueEntry re = delegate.computeIfAbsent(key, k -> { 1789 // The RAMCache reference to this entry, so reference count should be increment. 1790 entry.getData().retain(); 1791 absent.set(true); 1792 return entry; 1793 }); 1794 return absent.get() ? null : re; 1795 } 1796 1797 public boolean remove(BlockCacheKey key) { 1798 return remove(key, re -> { 1799 }); 1800 } 1801 1802 /** 1803 * Defined an {@link Consumer} here, because once the removed entry release its reference count, 1804 * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an 1805 * exception. the consumer will access entry to remove before release its reference count. 1806 * Notice, don't change its reference count in the {@link Consumer} 1807 */ 1808 public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) { 1809 RAMQueueEntry previous = delegate.remove(key); 1810 action.accept(previous); 1811 if (previous != null) { 1812 previous.getData().release(); 1813 } 1814 return previous != null; 1815 } 1816 1817 public boolean isEmpty() { 1818 return delegate.isEmpty(); 1819 } 1820 1821 public void clear() { 1822 Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator(); 1823 while (it.hasNext()) { 1824 RAMQueueEntry re = it.next().getValue(); 1825 it.remove(); 1826 re.getData().release(); 1827 } 1828 } 1829 } 1830}