001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021 022import java.io.File; 023import java.io.FileInputStream; 024import java.io.FileOutputStream; 025import java.io.IOException; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.Comparator; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.NavigableSet; 035import java.util.Optional; 036import java.util.PriorityQueue; 037import java.util.Set; 038import java.util.concurrent.ArrayBlockingQueue; 039import java.util.concurrent.BlockingQueue; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentMap; 042import java.util.concurrent.ConcurrentSkipListSet; 043import java.util.concurrent.Executors; 044import java.util.concurrent.ScheduledExecutorService; 045import java.util.concurrent.TimeUnit; 046import java.util.concurrent.atomic.AtomicBoolean; 047import java.util.concurrent.atomic.AtomicLong; 048import java.util.concurrent.atomic.LongAdder; 049import java.util.concurrent.locks.Lock; 050import java.util.concurrent.locks.ReentrantLock; 051import java.util.concurrent.locks.ReentrantReadWriteLock; 052import java.util.function.Consumer; 053import java.util.function.Function; 054import org.apache.commons.lang3.mutable.MutableInt; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.fs.Path; 057import org.apache.hadoop.hbase.HBaseConfiguration; 058import org.apache.hadoop.hbase.HBaseIOException; 059import org.apache.hadoop.hbase.TableName; 060import org.apache.hadoop.hbase.client.Admin; 061import org.apache.hadoop.hbase.io.ByteBuffAllocator; 062import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 063import org.apache.hadoop.hbase.io.HeapSize; 064import org.apache.hadoop.hbase.io.hfile.BlockCache; 065import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 066import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 067import org.apache.hadoop.hbase.io.hfile.BlockPriority; 068import org.apache.hadoop.hbase.io.hfile.BlockType; 069import org.apache.hadoop.hbase.io.hfile.CacheConfig; 070import org.apache.hadoop.hbase.io.hfile.CacheStats; 071import org.apache.hadoop.hbase.io.hfile.Cacheable; 072import org.apache.hadoop.hbase.io.hfile.CachedBlock; 073import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 074import org.apache.hadoop.hbase.io.hfile.HFileBlock; 075import org.apache.hadoop.hbase.io.hfile.HFileContext; 076import org.apache.hadoop.hbase.nio.ByteBuff; 077import org.apache.hadoop.hbase.nio.RefCnt; 078import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 079import org.apache.hadoop.hbase.util.Bytes; 080import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 081import org.apache.hadoop.hbase.util.IdReadWriteLock; 082import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef; 083import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool; 084import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType; 085import org.apache.hadoop.hbase.util.Pair; 086import org.apache.hadoop.util.StringUtils; 087import org.apache.yetus.audience.InterfaceAudience; 088import org.slf4j.Logger; 089import org.slf4j.LoggerFactory; 090 091import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 092import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 093 094import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; 095 096/** 097 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache 098 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket 099 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap 100 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files 101 * {@link FileIOEngine} to store/read the block data. 102 * <p> 103 * Eviction is via a similar algorithm as used in 104 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} 105 * <p> 106 * BucketCache can be used as mainly a block cache (see 107 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to 108 * decrease CMS GC and heap fragmentation. 109 * <p> 110 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to 111 * enlarge cache space via a victim cache. 112 */ 113@InterfaceAudience.Private 114public class BucketCache implements BlockCache, HeapSize { 115 private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class); 116 117 /** Priority buckets config */ 118 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; 119 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; 120 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; 121 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; 122 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; 123 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; 124 125 /** Use strong reference for offsetLock or not */ 126 private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref"; 127 private static final boolean STRONG_REF_DEFAULT = false; 128 129 /** Priority buckets */ 130 static final float DEFAULT_SINGLE_FACTOR = 0.25f; 131 static final float DEFAULT_MULTI_FACTOR = 0.50f; 132 static final float DEFAULT_MEMORY_FACTOR = 0.25f; 133 static final float DEFAULT_MIN_FACTOR = 0.85f; 134 135 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; 136 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 137 138 // Number of blocks to clear for each of the bucket size that is full 139 private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; 140 141 /** Statistics thread */ 142 private static final int statThreadPeriod = 5 * 60; 143 144 final static int DEFAULT_WRITER_THREADS = 3; 145 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; 146 147 // Store/read block data 148 transient final IOEngine ioEngine; 149 150 // Store the block in this map before writing it to cache 151 transient final RAMCache ramCache; 152 153 // In this map, store the block's meta data like offset, length 154 transient Map<BlockCacheKey, BucketEntry> backingMap; 155 156 private AtomicBoolean backingMapValidated = new AtomicBoolean(false); 157 158 /** 159 * Map of hFile -> Region -> File size. This map is used to track all files completed prefetch, 160 * together with the region those belong to and the total cached size for the 161 * region.TestBlockEvictionOnRegionMovement 162 */ 163 final Map<String, Pair<String, Long>> fullyCachedFiles = new ConcurrentHashMap<>(); 164 /** 165 * Map of region -> total size of the region prefetched on this region server. This is the total 166 * size of hFiles for this region prefetched on this region server 167 */ 168 final Map<String, Long> regionCachedSize = new ConcurrentHashMap<>(); 169 170 private BucketCachePersister cachePersister; 171 172 /** 173 * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so 174 * that Bucket IO exceptions/errors don't bring down the HBase server. 175 */ 176 private volatile boolean cacheEnabled; 177 178 /** 179 * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other 180 * words, the work adding blocks to the BucketCache is divided up amongst the running 181 * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when 182 * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It 183 * then updates the ramCache and backingMap accordingly. 184 */ 185 transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); 186 transient final WriterThread[] writerThreads; 187 188 /** Volatile boolean to track if free space is in process or not */ 189 private volatile boolean freeInProgress = false; 190 private transient final Lock freeSpaceLock = new ReentrantLock(); 191 192 private final LongAdder realCacheSize = new LongAdder(); 193 private final LongAdder heapSize = new LongAdder(); 194 /** Current number of cached elements */ 195 private final LongAdder blockNumber = new LongAdder(); 196 197 /** Cache access count (sequential ID) */ 198 private final AtomicLong accessCount = new AtomicLong(); 199 200 private static final int DEFAULT_CACHE_WAIT_TIME = 50; 201 202 private final BucketCacheStats cacheStats = new BucketCacheStats(); 203 private final String persistencePath; 204 static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false); 205 private final long cacheCapacity; 206 /** Approximate block size */ 207 private final long blockSize; 208 209 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ 210 private final int ioErrorsTolerationDuration; 211 // 1 min 212 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; 213 214 // Start time of first IO error when reading or writing IO Engine, it will be 215 // reset after a successful read/write. 216 private volatile long ioErrorStartTime = -1; 217 218 /** 219 * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of 220 * this is to avoid freeing the block which is being read. 221 * <p> 222 */ 223 transient final IdReadWriteLock<Long> offsetLock; 224 225 NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>( 226 Comparator.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset)); 227 228 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 229 private transient final ScheduledExecutorService scheduleThreadPool = 230 Executors.newScheduledThreadPool(1, 231 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); 232 233 // Allocate or free space for the block 234 private transient BucketAllocator bucketAllocator; 235 236 /** Acceptable size of cache (no evictions if size < acceptable) */ 237 private float acceptableFactor; 238 239 /** Minimum threshold of cache (when evicting, evict until size < min) */ 240 private float minFactor; 241 242 /** 243 * Free this floating point factor of extra blocks when evicting. For example free the number of 244 * blocks requested * (1 + extraFreeFactor) 245 */ 246 private float extraFreeFactor; 247 248 /** Single access bucket size */ 249 private float singleFactor; 250 251 /** Multiple access bucket size */ 252 private float multiFactor; 253 254 /** In-memory bucket size */ 255 private float memoryFactor; 256 257 private long bucketcachePersistInterval; 258 259 private static final String FILE_VERIFY_ALGORITHM = 260 "hbase.bucketcache.persistent.file.integrity.check.algorithm"; 261 private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; 262 263 private static final String QUEUE_ADDITION_WAIT_TIME = 264 "hbase.bucketcache.queue.addition.waittime"; 265 private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0; 266 private long queueAdditionWaitTime; 267 /** 268 * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file 269 * integrity, default algorithm is MD5 270 */ 271 private String algorithm; 272 273 /* Tracing failed Bucket Cache allocations. */ 274 private long allocFailLogPrevTs; // time of previous log event for allocation failure. 275 private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute. 276 277 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 278 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 279 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 280 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); 281 } 282 283 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 284 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 285 Configuration conf) throws IOException { 286 Preconditions.checkArgument(blockSize > 0, 287 "BucketCache capacity is set to " + blockSize + ", can not be less than 0"); 288 boolean useStrongRef = conf.getBoolean(STRONG_REF_KEY, STRONG_REF_DEFAULT); 289 if (useStrongRef) { 290 this.offsetLock = new IdReadWriteLockStrongRef<>(); 291 } else { 292 this.offsetLock = new IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT); 293 } 294 this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); 295 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); 296 this.writerThreads = new WriterThread[writerThreadNum]; 297 long blockNumCapacity = capacity / blockSize; 298 if (blockNumCapacity >= Integer.MAX_VALUE) { 299 // Enough for about 32TB of cache! 300 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); 301 } 302 303 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); 304 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); 305 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); 306 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); 307 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); 308 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); 309 this.queueAdditionWaitTime = 310 conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); 311 this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000); 312 313 sanityCheckConfigs(); 314 315 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor 316 + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " 317 + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor 318 + ", useStrongRef: " + useStrongRef); 319 320 this.cacheCapacity = capacity; 321 this.persistencePath = persistencePath; 322 this.blockSize = blockSize; 323 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; 324 325 this.allocFailLogPrevTs = 0; 326 327 for (int i = 0; i < writerThreads.length; ++i) { 328 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); 329 } 330 331 assert writerQueues.size() == writerThreads.length; 332 this.ramCache = new RAMCache(); 333 334 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); 335 336 if (isCachePersistent()) { 337 if (ioEngine instanceof FileIOEngine) { 338 startBucketCachePersisterThread(); 339 } 340 try { 341 retrieveFromFile(bucketSizes); 342 } catch (IOException ioex) { 343 LOG.error("Can't restore from file[{}] because of ", persistencePath, ioex); 344 backingMap.clear(); 345 fullyCachedFiles.clear(); 346 backingMapValidated.set(true); 347 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 348 regionCachedSize.clear(); 349 } 350 } else { 351 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 352 } 353 final String threadName = Thread.currentThread().getName(); 354 this.cacheEnabled = true; 355 for (int i = 0; i < writerThreads.length; ++i) { 356 writerThreads[i] = new WriterThread(writerQueues.get(i)); 357 writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); 358 writerThreads[i].setDaemon(true); 359 } 360 startWriterThreads(); 361 362 // Run the statistics thread periodically to print the cache statistics log 363 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 364 // every five minutes. 365 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, 366 statThreadPeriod, TimeUnit.SECONDS); 367 LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity=" 368 + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize) 369 + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" 370 + persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); 371 } 372 373 private void sanityCheckConfigs() { 374 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, 375 ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 376 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, 377 MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 378 Preconditions.checkArgument(minFactor <= acceptableFactor, 379 MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); 380 Preconditions.checkArgument(extraFreeFactor >= 0, 381 EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); 382 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, 383 SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 384 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, 385 MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 386 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, 387 MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 388 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, 389 SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and " 390 + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); 391 } 392 393 /** 394 * Called by the constructor to start the writer threads. Used by tests that need to override 395 * starting the threads. 396 */ 397 protected void startWriterThreads() { 398 for (WriterThread thread : writerThreads) { 399 thread.start(); 400 } 401 } 402 403 void startBucketCachePersisterThread() { 404 LOG.info("Starting BucketCachePersisterThread"); 405 cachePersister = new BucketCachePersister(this, bucketcachePersistInterval); 406 cachePersister.setDaemon(true); 407 cachePersister.start(); 408 } 409 410 boolean isCacheEnabled() { 411 return this.cacheEnabled; 412 } 413 414 @Override 415 public long getMaxSize() { 416 return this.cacheCapacity; 417 } 418 419 public String getIoEngine() { 420 return ioEngine.toString(); 421 } 422 423 /** 424 * Get the IOEngine from the IO engine name 425 * @return the IOEngine 426 */ 427 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) 428 throws IOException { 429 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { 430 // In order to make the usage simple, we only need the prefix 'files:' in 431 // document whether one or multiple file(s), but also support 'file:' for 432 // the compatibility 433 String[] filePaths = 434 ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER); 435 return new FileIOEngine(capacity, persistencePath != null, filePaths); 436 } else if (ioEngineName.startsWith("offheap")) { 437 return new ByteBufferIOEngine(capacity); 438 } else if (ioEngineName.startsWith("mmap:")) { 439 return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 440 } else if (ioEngineName.startsWith("pmem:")) { 441 // This mode of bucket cache creates an IOEngine over a file on the persistent memory 442 // device. Since the persistent memory device has its own address space the contents 443 // mapped to this address space does not get swapped out like in the case of mmapping 444 // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache 445 // can be directly referred to without having to copy them onheap. Once the RPC is done, 446 // the blocks can be returned back as in case of ByteBufferIOEngine. 447 return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 448 } else { 449 throw new IllegalArgumentException( 450 "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); 451 } 452 } 453 454 public boolean isCachePersistenceEnabled() { 455 return persistencePath != null; 456 } 457 458 /** 459 * Cache the block with the specified name and buffer. 460 * @param cacheKey block's cache key 461 * @param buf block buffer 462 */ 463 @Override 464 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 465 cacheBlock(cacheKey, buf, false); 466 } 467 468 /** 469 * Cache the block with the specified name and buffer. 470 * @param cacheKey block's cache key 471 * @param cachedItem block buffer 472 * @param inMemory if block is in-memory 473 */ 474 @Override 475 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { 476 cacheBlockWithWait(cacheKey, cachedItem, inMemory, false); 477 } 478 479 /** 480 * Cache the block with the specified name and buffer. 481 * @param cacheKey block's cache key 482 * @param cachedItem block buffer 483 * @param inMemory if block is in-memory 484 */ 485 @Override 486 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 487 boolean waitWhenCache) { 488 cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0); 489 } 490 491 /** 492 * Cache the block to ramCache 493 * @param cacheKey block's cache key 494 * @param cachedItem block buffer 495 * @param inMemory if block is in-memory 496 * @param wait if true, blocking wait when queue is full 497 */ 498 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 499 boolean wait) { 500 if (cacheEnabled) { 501 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { 502 if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) { 503 BucketEntry bucketEntry = backingMap.get(cacheKey); 504 if (bucketEntry != null && bucketEntry.isRpcRef()) { 505 // avoid replace when there are RPC refs for the bucket entry in bucket cache 506 return; 507 } 508 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 509 } 510 } else { 511 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 512 } 513 } 514 } 515 516 protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { 517 return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock); 518 } 519 520 protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 521 boolean inMemory, boolean wait) { 522 if (!cacheEnabled) { 523 return; 524 } 525 if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) { 526 cacheKey.setBlockType(cachedItem.getBlockType()); 527 } 528 LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); 529 // Stuff the entry into the RAM cache so it can get drained to the persistent store 530 RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), 531 inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine); 532 /** 533 * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same 534 * key in ramCache, the heap size of bucket cache need to update if replacing entry from 535 * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if 536 * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct 537 * one, then the heap size will mess up (HBASE-20789) 538 */ 539 if (ramCache.putIfAbsent(cacheKey, re) != null) { 540 return; 541 } 542 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); 543 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); 544 boolean successfulAddition = false; 545 if (wait) { 546 try { 547 successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS); 548 } catch (InterruptedException e) { 549 Thread.currentThread().interrupt(); 550 } 551 } else { 552 successfulAddition = bq.offer(re); 553 } 554 if (!successfulAddition) { 555 ramCache.remove(cacheKey); 556 cacheStats.failInsert(); 557 } else { 558 this.blockNumber.increment(); 559 this.heapSize.add(cachedItem.heapSize()); 560 } 561 } 562 563 /** 564 * Get the buffer of the block with the specified key. 565 * @param key block's cache key 566 * @param caching true if the caller caches blocks on cache misses 567 * @param repeat Whether this is a repeat lookup for the same block 568 * @param updateCacheMetrics Whether we should update cache metrics or not 569 * @return buffer of specified cache key, or null if not in cache 570 */ 571 @Override 572 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 573 boolean updateCacheMetrics) { 574 if (!cacheEnabled) { 575 return null; 576 } 577 RAMQueueEntry re = ramCache.get(key); 578 if (re != null) { 579 if (updateCacheMetrics) { 580 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 581 } 582 re.access(accessCount.incrementAndGet()); 583 return re.getData(); 584 } 585 BucketEntry bucketEntry = backingMap.get(key); 586 if (bucketEntry != null) { 587 long start = System.nanoTime(); 588 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 589 try { 590 lock.readLock().lock(); 591 // We can not read here even if backingMap does contain the given key because its offset 592 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check 593 // existence here. 594 if (bucketEntry.equals(backingMap.get(key))) { 595 // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the 596 // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to 597 // the same BucketEntry, then all of the three will share the same refCnt. 598 Cacheable cachedBlock = ioEngine.read(bucketEntry); 599 if (ioEngine.usesSharedMemory()) { 600 // If IOEngine use shared memory, cachedBlock and BucketEntry will share the 601 // same RefCnt, do retain here, in order to count the number of RPC references 602 cachedBlock.retain(); 603 } 604 // Update the cache statistics. 605 if (updateCacheMetrics) { 606 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 607 cacheStats.ioHit(System.nanoTime() - start); 608 } 609 bucketEntry.access(accessCount.incrementAndGet()); 610 if (this.ioErrorStartTime > 0) { 611 ioErrorStartTime = -1; 612 } 613 return cachedBlock; 614 } 615 } catch (HBaseIOException hioex) { 616 // When using file io engine persistent cache, 617 // the cache map state might differ from the actual cache. If we reach this block, 618 // we should remove the cache key entry from the backing map 619 backingMap.remove(key); 620 fullyCachedFiles.remove(key.getHfileName()); 621 LOG.debug("Failed to fetch block for cache key: {}.", key, hioex); 622 } catch (IOException ioex) { 623 LOG.error("Failed reading block " + key + " from bucket cache", ioex); 624 checkIOErrorIsTolerated(); 625 } finally { 626 lock.readLock().unlock(); 627 } 628 } 629 if (!repeat && updateCacheMetrics) { 630 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 631 } 632 return null; 633 } 634 635 /** 636 * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap} 637 */ 638 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 639 boolean evictedByEvictionProcess) { 640 bucketEntry.markAsEvicted(); 641 blocksByHFile.remove(cacheKey); 642 if (decrementBlockNumber) { 643 this.blockNumber.decrement(); 644 if (ioEngine.isPersistent()) { 645 fileNotFullyCached(cacheKey.getHfileName()); 646 } 647 } 648 if (evictedByEvictionProcess) { 649 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); 650 } 651 if (ioEngine.isPersistent()) { 652 setCacheInconsistent(true); 653 } 654 } 655 656 private void fileNotFullyCached(String hfileName) { 657 // Update the regionPrefetchedSizeMap before removing the file from prefetchCompleted 658 if (fullyCachedFiles.containsKey(hfileName)) { 659 Pair<String, Long> regionEntry = fullyCachedFiles.get(hfileName); 660 String regionEncodedName = regionEntry.getFirst(); 661 long filePrefetchSize = regionEntry.getSecond(); 662 LOG.debug("Removing file {} for region {}", hfileName, regionEncodedName); 663 regionCachedSize.computeIfPresent(regionEncodedName, (rn, pf) -> pf - filePrefetchSize); 664 // If all the blocks for a region are evicted from the cache, remove the entry for that region 665 if ( 666 regionCachedSize.containsKey(regionEncodedName) 667 && regionCachedSize.get(regionEncodedName) == 0 668 ) { 669 regionCachedSize.remove(regionEncodedName); 670 } 671 } 672 fullyCachedFiles.remove(hfileName); 673 } 674 675 public void fileCacheCompleted(Path filePath, long size) { 676 Pair<String, Long> pair = new Pair<>(); 677 // sets the region name 678 String regionName = filePath.getParent().getParent().getName(); 679 pair.setFirst(regionName); 680 pair.setSecond(size); 681 fullyCachedFiles.put(filePath.getName(), pair); 682 } 683 684 private void updateRegionCachedSize(Path filePath, long cachedSize) { 685 if (filePath != null) { 686 String regionName = filePath.getParent().getParent().getName(); 687 regionCachedSize.merge(regionName, cachedSize, 688 (previousSize, newBlockSize) -> previousSize + newBlockSize); 689 } 690 } 691 692 /** 693 * Free the {{@link BucketEntry} actually,which could only be invoked when the 694 * {@link BucketEntry#refCnt} becoming 0. 695 */ 696 void freeBucketEntry(BucketEntry bucketEntry) { 697 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 698 realCacheSize.add(-1 * bucketEntry.getLength()); 699 } 700 701 /** 702 * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br> 703 * 1. Close an HFile, and clear all cached blocks. <br> 704 * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br> 705 * <p> 706 * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap. 707 * Here we evict the block from backingMap immediately, but only free the reference from bucket 708 * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this 709 * block, block can only be de-allocated when all of them release the block. 710 * <p> 711 * NOTICE: we need to grab the write offset lock firstly before releasing the reference from 712 * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when 713 * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak. 714 * @param cacheKey Block to evict 715 * @return true to indicate whether we've evicted successfully or not. 716 */ 717 @Override 718 public boolean evictBlock(BlockCacheKey cacheKey) { 719 return doEvictBlock(cacheKey, null, false); 720 } 721 722 /** 723 * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and 724 * {@link BucketCache#ramCache}. <br/> 725 * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 726 * {@link BucketEntry} could be removed. 727 * @param cacheKey {@link BlockCacheKey} to evict. 728 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 729 * @return true to indicate whether we've evicted successfully or not. 730 */ 731 private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, 732 boolean evictedByEvictionProcess) { 733 if (!cacheEnabled) { 734 return false; 735 } 736 boolean existedInRamCache = removeFromRamCache(cacheKey); 737 if (bucketEntry == null) { 738 bucketEntry = backingMap.get(cacheKey); 739 } 740 final BucketEntry bucketEntryToUse = bucketEntry; 741 742 if (bucketEntryToUse == null) { 743 if (existedInRamCache && evictedByEvictionProcess) { 744 cacheStats.evicted(0, cacheKey.isPrimary()); 745 } 746 return existedInRamCache; 747 } else { 748 return bucketEntryToUse.withWriteLock(offsetLock, () -> { 749 if (backingMap.remove(cacheKey, bucketEntryToUse)) { 750 LOG.debug("removed key {} from back map with offset lock {} in the evict process", 751 cacheKey, bucketEntryToUse.offset()); 752 blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess); 753 return true; 754 } 755 return false; 756 }); 757 } 758 } 759 760 /** 761 * <pre> 762 * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as 763 * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}. 764 * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf} 765 * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different: 766 * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap}, 767 * it is the return value of current {@link BucketCache#createRecycler} method. 768 * 769 * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache}, 770 * it is {@link ByteBuffAllocator#putbackBuffer}. 771 * </pre> 772 */ 773 private Recycler createRecycler(final BucketEntry bucketEntry) { 774 return () -> { 775 freeBucketEntry(bucketEntry); 776 return; 777 }; 778 } 779 780 /** 781 * NOTE: This method is only for test. 782 */ 783 public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) { 784 BucketEntry bucketEntry = backingMap.get(blockCacheKey); 785 if (bucketEntry == null) { 786 return false; 787 } 788 return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry); 789 } 790 791 /** 792 * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if 793 * {@link BucketEntry#isRpcRef} is false. <br/> 794 * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 795 * {@link BucketEntry} could be removed. 796 * @param blockCacheKey {@link BlockCacheKey} to evict. 797 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 798 * @return true to indicate whether we've evicted successfully or not. 799 */ 800 boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) { 801 if (!bucketEntry.isRpcRef()) { 802 return doEvictBlock(blockCacheKey, bucketEntry, true); 803 } 804 return false; 805 } 806 807 protected boolean removeFromRamCache(BlockCacheKey cacheKey) { 808 return ramCache.remove(cacheKey, re -> { 809 if (re != null) { 810 this.blockNumber.decrement(); 811 this.heapSize.add(-1 * re.getData().heapSize()); 812 } 813 }); 814 } 815 816 public boolean isCacheInconsistent() { 817 return isCacheInconsistent.get(); 818 } 819 820 public void setCacheInconsistent(boolean setCacheInconsistent) { 821 isCacheInconsistent.set(setCacheInconsistent); 822 } 823 824 /* 825 * Statistics thread. Periodically output cache statistics to the log. 826 */ 827 private static class StatisticsThread extends Thread { 828 private final BucketCache bucketCache; 829 830 public StatisticsThread(BucketCache bucketCache) { 831 super("BucketCacheStatsThread"); 832 setDaemon(true); 833 this.bucketCache = bucketCache; 834 } 835 836 @Override 837 public void run() { 838 bucketCache.logStats(); 839 } 840 } 841 842 public void logStats() { 843 long totalSize = bucketAllocator.getTotalSize(); 844 long usedSize = bucketAllocator.getUsedSize(); 845 long freeSize = totalSize - usedSize; 846 long cacheSize = getRealCacheSize(); 847 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize=" 848 + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", " 849 + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize=" 850 + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", " 851 + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond=" 852 + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit=" 853 + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio=" 854 + (cacheStats.getHitCount() == 0 855 ? "0," 856 : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", ")) 857 + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits=" 858 + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio=" 859 + (cacheStats.getHitCachingCount() == 0 860 ? "0," 861 : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", ")) 862 + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted=" 863 + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction() 864 + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount()); 865 cacheStats.reset(); 866 867 bucketAllocator.logDebugStatistics(); 868 } 869 870 public long getRealCacheSize() { 871 return this.realCacheSize.sum(); 872 } 873 874 public long acceptableSize() { 875 return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); 876 } 877 878 long getPartitionSize(float partitionFactor) { 879 return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); 880 } 881 882 /** 883 * Return the count of bucketSizeinfos still need free space 884 */ 885 private int bucketSizesAboveThresholdCount(float minFactor) { 886 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 887 int fullCount = 0; 888 for (int i = 0; i < stats.length; i++) { 889 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 890 freeGoal = Math.max(freeGoal, 1); 891 if (stats[i].freeCount() < freeGoal) { 892 fullCount++; 893 } 894 } 895 return fullCount; 896 } 897 898 /** 899 * This method will find the buckets that are minimally occupied and are not reference counted and 900 * will free them completely without any constraint on the access times of the elements, and as a 901 * process will completely free at most the number of buckets passed, sometimes it might not due 902 * to changing refCounts 903 * @param completelyFreeBucketsNeeded number of buckets to free 904 **/ 905 private void freeEntireBuckets(int completelyFreeBucketsNeeded) { 906 if (completelyFreeBucketsNeeded != 0) { 907 // First we will build a set where the offsets are reference counted, usually 908 // this set is small around O(Handler Count) unless something else is wrong 909 Set<Integer> inUseBuckets = new HashSet<>(); 910 backingMap.forEach((k, be) -> { 911 if (be.isRpcRef()) { 912 inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset())); 913 } 914 }); 915 Set<Integer> candidateBuckets = 916 bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded); 917 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) { 918 if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) { 919 evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue()); 920 } 921 } 922 } 923 } 924 925 /** 926 * Free the space if the used size reaches acceptableSize() or one size block couldn't be 927 * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some 928 * blocks evicted 929 * @param why Why we are being called 930 */ 931 void freeSpace(final String why) { 932 // Ensure only one freeSpace progress at a time 933 if (!freeSpaceLock.tryLock()) { 934 return; 935 } 936 try { 937 freeInProgress = true; 938 long bytesToFreeWithoutExtra = 0; 939 // Calculate free byte for each bucketSizeinfo 940 StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null; 941 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 942 long[] bytesToFreeForBucket = new long[stats.length]; 943 for (int i = 0; i < stats.length; i++) { 944 bytesToFreeForBucket[i] = 0; 945 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 946 freeGoal = Math.max(freeGoal, 1); 947 if (stats[i].freeCount() < freeGoal) { 948 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); 949 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; 950 if (msgBuffer != null) { 951 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" 952 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); 953 } 954 } 955 } 956 if (msgBuffer != null) { 957 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); 958 } 959 960 if (bytesToFreeWithoutExtra <= 0) { 961 return; 962 } 963 long currentSize = bucketAllocator.getUsedSize(); 964 long totalSize = bucketAllocator.getTotalSize(); 965 if (LOG.isDebugEnabled() && msgBuffer != null) { 966 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() 967 + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" 968 + StringUtils.byteDesc(realCacheSize.sum()) + ", total=" 969 + StringUtils.byteDesc(totalSize)); 970 } 971 972 long bytesToFreeWithExtra = 973 (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor)); 974 975 // Instantiate priority buckets 976 BucketEntryGroup bucketSingle = 977 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor)); 978 BucketEntryGroup bucketMulti = 979 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor)); 980 BucketEntryGroup bucketMemory = 981 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor)); 982 983 // Scan entire map putting bucket entry into appropriate bucket entry 984 // group 985 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { 986 switch (bucketEntryWithKey.getValue().getPriority()) { 987 case SINGLE: { 988 bucketSingle.add(bucketEntryWithKey); 989 break; 990 } 991 case MULTI: { 992 bucketMulti.add(bucketEntryWithKey); 993 break; 994 } 995 case MEMORY: { 996 bucketMemory.add(bucketEntryWithKey); 997 break; 998 } 999 } 1000 } 1001 1002 PriorityQueue<BucketEntryGroup> bucketQueue = 1003 new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow)); 1004 1005 bucketQueue.add(bucketSingle); 1006 bucketQueue.add(bucketMulti); 1007 bucketQueue.add(bucketMemory); 1008 1009 int remainingBuckets = bucketQueue.size(); 1010 long bytesFreed = 0; 1011 1012 BucketEntryGroup bucketGroup; 1013 while ((bucketGroup = bucketQueue.poll()) != null) { 1014 long overflow = bucketGroup.overflow(); 1015 if (overflow > 0) { 1016 long bucketBytesToFree = 1017 Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); 1018 bytesFreed += bucketGroup.free(bucketBytesToFree); 1019 } 1020 remainingBuckets--; 1021 } 1022 1023 // Check and free if there are buckets that still need freeing of space 1024 if (bucketSizesAboveThresholdCount(minFactor) > 0) { 1025 bucketQueue.clear(); 1026 remainingBuckets = 3; 1027 1028 bucketQueue.add(bucketSingle); 1029 bucketQueue.add(bucketMulti); 1030 bucketQueue.add(bucketMemory); 1031 1032 while ((bucketGroup = bucketQueue.poll()) != null) { 1033 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; 1034 bytesFreed += bucketGroup.free(bucketBytesToFree); 1035 remainingBuckets--; 1036 } 1037 } 1038 1039 // Even after the above free we might still need freeing because of the 1040 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e 1041 // there might be some buckets where the occupancy is very sparse and thus are not 1042 // yielding the free for the other bucket sizes, the fix for this to evict some 1043 // of the buckets, we do this by evicting the buckets that are least fulled 1044 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f)); 1045 1046 if (LOG.isDebugEnabled()) { 1047 long single = bucketSingle.totalSize(); 1048 long multi = bucketMulti.totalSize(); 1049 long memory = bucketMemory.totalSize(); 1050 if (LOG.isDebugEnabled()) { 1051 LOG.debug("Bucket cache free space completed; " + "freed=" 1052 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize) 1053 + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" 1054 + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); 1055 } 1056 } 1057 1058 } catch (Throwable t) { 1059 LOG.warn("Failed freeing space", t); 1060 } finally { 1061 cacheStats.evict(); 1062 freeInProgress = false; 1063 freeSpaceLock.unlock(); 1064 } 1065 } 1066 1067 // This handles flushing the RAM cache to IOEngine. 1068 class WriterThread extends Thread { 1069 private final BlockingQueue<RAMQueueEntry> inputQueue; 1070 private volatile boolean writerEnabled = true; 1071 private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE); 1072 1073 WriterThread(BlockingQueue<RAMQueueEntry> queue) { 1074 super("BucketCacheWriterThread"); 1075 this.inputQueue = queue; 1076 } 1077 1078 // Used for test 1079 void disableWriter() { 1080 this.writerEnabled = false; 1081 } 1082 1083 @Override 1084 public void run() { 1085 List<RAMQueueEntry> entries = new ArrayList<>(); 1086 try { 1087 while (cacheEnabled && writerEnabled) { 1088 try { 1089 try { 1090 // Blocks 1091 entries = getRAMQueueEntries(inputQueue, entries); 1092 } catch (InterruptedException ie) { 1093 if (!cacheEnabled || !writerEnabled) { 1094 break; 1095 } 1096 } 1097 doDrain(entries, metaBuff); 1098 } catch (Exception ioe) { 1099 LOG.error("WriterThread encountered error", ioe); 1100 } 1101 } 1102 } catch (Throwable t) { 1103 LOG.warn("Failed doing drain", t); 1104 } 1105 LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); 1106 } 1107 } 1108 1109 /** 1110 * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing 1111 * cache with a new block for the same cache key. there's a corner case: one thread cache a block 1112 * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block 1113 * with the same cache key do the same thing for the same cache key, so if not evict the previous 1114 * bucket entry, then memory leak happen because the previous bucketEntry is gone but the 1115 * bucketAllocator do not free its memory. 1116 * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey 1117 * cacheKey, Cacheable newBlock) 1118 * @param key Block cache key 1119 * @param bucketEntry Bucket entry to put into backingMap. 1120 */ 1121 protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 1122 BucketEntry previousEntry = backingMap.put(key, bucketEntry); 1123 blocksByHFile.add(key); 1124 updateRegionCachedSize(key.getFilePath(), bucketEntry.getLength()); 1125 if (previousEntry != null && previousEntry != bucketEntry) { 1126 previousEntry.withWriteLock(offsetLock, () -> { 1127 blockEvicted(key, previousEntry, false, false); 1128 return null; 1129 }); 1130 } 1131 } 1132 1133 /** 1134 * Prepare and return a warning message for Bucket Allocator Exception 1135 * @param fle The exception 1136 * @param re The RAMQueueEntry for which the exception was thrown. 1137 * @return A warning message created from the input RAMQueueEntry object. 1138 */ 1139 private static String getAllocationFailWarningMessage(final BucketAllocatorException fle, 1140 final RAMQueueEntry re) { 1141 final StringBuilder sb = new StringBuilder(); 1142 sb.append("Most recent failed allocation after "); 1143 sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD); 1144 sb.append(" ms;"); 1145 if (re != null) { 1146 if (re.getData() instanceof HFileBlock) { 1147 final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext(); 1148 final String columnFamily = Bytes.toString(fileContext.getColumnFamily()); 1149 final String tableName = Bytes.toString(fileContext.getTableName()); 1150 if (tableName != null) { 1151 sb.append(" Table: "); 1152 sb.append(tableName); 1153 } 1154 if (columnFamily != null) { 1155 sb.append(" CF: "); 1156 sb.append(columnFamily); 1157 } 1158 sb.append(" HFile: "); 1159 if (fileContext.getHFileName() != null) { 1160 sb.append(fileContext.getHFileName()); 1161 } else { 1162 sb.append(re.getKey()); 1163 } 1164 } else { 1165 sb.append(" HFile: "); 1166 sb.append(re.getKey()); 1167 } 1168 } 1169 sb.append(" Message: "); 1170 sb.append(fle.getMessage()); 1171 return sb.toString(); 1172 } 1173 1174 /** 1175 * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that 1176 * are passed in even if failure being sure to remove from ramCache else we'll never undo the 1177 * references and we'll OOME. 1178 * @param entries Presumes list passed in here will be processed by this invocation only. No 1179 * interference expected. 1180 */ 1181 void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException { 1182 if (entries.isEmpty()) { 1183 return; 1184 } 1185 // This method is a little hard to follow. We run through the passed in entries and for each 1186 // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must 1187 // do cleanup making sure we've cleared ramCache of all entries regardless of whether we 1188 // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by 1189 // filling ramCache. We do the clean up by again running through the passed in entries 1190 // doing extra work when we find a non-null bucketEntries corresponding entry. 1191 final int size = entries.size(); 1192 BucketEntry[] bucketEntries = new BucketEntry[size]; 1193 // Index updated inside loop if success or if we can't succeed. We retry if cache is full 1194 // when we go to add an entry by going around the loop again without upping the index. 1195 int index = 0; 1196 while (cacheEnabled && index < size) { 1197 RAMQueueEntry re = null; 1198 try { 1199 re = entries.get(index); 1200 if (re == null) { 1201 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); 1202 index++; 1203 continue; 1204 } 1205 // Reset the position for reuse. 1206 // It should be guaranteed that the data in the metaBuff has been transferred to the 1207 // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already 1208 // transferred with our current IOEngines. Should take care, when we have new kinds of 1209 // IOEngine in the future. 1210 metaBuff.clear(); 1211 BucketEntry bucketEntry = 1212 re.writeToCache(ioEngine, bucketAllocator, realCacheSize, this::createRecycler, metaBuff); 1213 // Successfully added. Up index and add bucketEntry. Clear io exceptions. 1214 bucketEntries[index] = bucketEntry; 1215 if (ioErrorStartTime > 0) { 1216 ioErrorStartTime = -1; 1217 } 1218 index++; 1219 } catch (BucketAllocatorException fle) { 1220 long currTs = EnvironmentEdgeManager.currentTime(); 1221 cacheStats.allocationFailed(); // Record the warning. 1222 if ( 1223 allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD 1224 ) { 1225 LOG.warn(getAllocationFailWarningMessage(fle, re)); 1226 allocFailLogPrevTs = currTs; 1227 } 1228 // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. 1229 bucketEntries[index] = null; 1230 index++; 1231 } catch (CacheFullException cfe) { 1232 // Cache full when we tried to add. Try freeing space and then retrying (don't up index) 1233 if (!freeInProgress) { 1234 freeSpace("Full!"); 1235 } else { 1236 Thread.sleep(50); 1237 } 1238 } catch (IOException ioex) { 1239 // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. 1240 LOG.error("Failed writing to bucket cache", ioex); 1241 checkIOErrorIsTolerated(); 1242 } 1243 } 1244 1245 // Make sure data pages are written on media before we update maps. 1246 try { 1247 ioEngine.sync(); 1248 } catch (IOException ioex) { 1249 LOG.error("Failed syncing IO engine", ioex); 1250 checkIOErrorIsTolerated(); 1251 // Since we failed sync, free the blocks in bucket allocator 1252 for (int i = 0; i < entries.size(); ++i) { 1253 BucketEntry bucketEntry = bucketEntries[i]; 1254 if (bucketEntry != null) { 1255 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 1256 bucketEntries[i] = null; 1257 } 1258 } 1259 } 1260 1261 // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if 1262 // success or error. 1263 for (int i = 0; i < size; ++i) { 1264 BlockCacheKey key = entries.get(i).getKey(); 1265 // Only add if non-null entry. 1266 if (bucketEntries[i] != null) { 1267 putIntoBackingMap(key, bucketEntries[i]); 1268 if (ioEngine.isPersistent()) { 1269 setCacheInconsistent(true); 1270 } 1271 } 1272 // Always remove from ramCache even if we failed adding it to the block cache above. 1273 boolean existed = ramCache.remove(key, re -> { 1274 if (re != null) { 1275 heapSize.add(-1 * re.getData().heapSize()); 1276 } 1277 }); 1278 if (!existed && bucketEntries[i] != null) { 1279 // Block should have already been evicted. Remove it and free space. 1280 final BucketEntry bucketEntry = bucketEntries[i]; 1281 bucketEntry.withWriteLock(offsetLock, () -> { 1282 if (backingMap.remove(key, bucketEntry)) { 1283 blockEvicted(key, bucketEntry, false, false); 1284 } 1285 return null; 1286 }); 1287 } 1288 } 1289 1290 long used = bucketAllocator.getUsedSize(); 1291 if (used > acceptableSize()) { 1292 freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); 1293 } 1294 return; 1295 } 1296 1297 /** 1298 * Blocks until elements available in {@code q} then tries to grab as many as possible before 1299 * returning. 1300 * @param receptacle Where to stash the elements taken from queue. We clear before we use it just 1301 * in case. 1302 * @param q The queue to take from. 1303 * @return {@code receptacle} laden with elements taken from the queue or empty if none found. 1304 */ 1305 static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q, 1306 List<RAMQueueEntry> receptacle) throws InterruptedException { 1307 // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it 1308 // ok even if list grew to accommodate thousands. 1309 receptacle.clear(); 1310 receptacle.add(q.take()); 1311 q.drainTo(receptacle); 1312 return receptacle; 1313 } 1314 1315 /** 1316 * @see #retrieveFromFile(int[]) 1317 */ 1318 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", 1319 justification = "false positive, try-with-resources ensures close is called.") 1320 void persistToFile() throws IOException { 1321 LOG.debug("Thread {} started persisting bucket cache to file", 1322 Thread.currentThread().getName()); 1323 if (!isCachePersistent()) { 1324 throw new IOException("Attempt to persist non-persistent cache mappings!"); 1325 } 1326 File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime()); 1327 try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) { 1328 fos.write(ProtobufMagic.PB_MAGIC); 1329 BucketProtoUtils.toPB(this).writeDelimitedTo(fos); 1330 } catch (IOException e) { 1331 LOG.error("Failed to persist bucket cache to file", e); 1332 throw e; 1333 } 1334 LOG.debug("Thread {} finished persisting bucket cache to file, renaming", 1335 Thread.currentThread().getName()); 1336 if (!tempPersistencePath.renameTo(new File(persistencePath))) { 1337 LOG.warn("Failed to commit cache persistent file. We might lose cached blocks if " 1338 + "RS crashes/restarts before we successfully checkpoint again."); 1339 } 1340 } 1341 1342 public boolean isCachePersistent() { 1343 return ioEngine.isPersistent() && persistencePath != null; 1344 } 1345 1346 @Override 1347 public Optional<Map<String, Long>> getRegionCachedInfo() { 1348 return Optional.of(Collections.unmodifiableMap(regionCachedSize)); 1349 } 1350 1351 /** 1352 * @see #persistToFile() 1353 */ 1354 private void retrieveFromFile(int[] bucketSizes) throws IOException { 1355 LOG.info("Started retrieving bucket cache from file"); 1356 File persistenceFile = new File(persistencePath); 1357 if (!persistenceFile.exists()) { 1358 LOG.warn("Persistence file missing! " 1359 + "It's ok if it's first run after enabling persistent cache."); 1360 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1361 blockNumber.add(backingMap.size()); 1362 backingMapValidated.set(true); 1363 return; 1364 } 1365 assert !cacheEnabled; 1366 1367 try (FileInputStream in = new FileInputStream(persistenceFile)) { 1368 int pblen = ProtobufMagic.lengthOfPBMagic(); 1369 byte[] pbuf = new byte[pblen]; 1370 int read = in.read(pbuf); 1371 if (read != pblen) { 1372 throw new IOException("Incorrect number of bytes read while checking for protobuf magic " 1373 + "number. Requested=" + pblen + ", Received= " + read + ", File=" + persistencePath); 1374 } 1375 if (!ProtobufMagic.isPBMagicPrefix(pbuf)) { 1376 // In 3.0 we have enough flexibility to dump the old cache data. 1377 // TODO: In 2.x line, this might need to be filled in to support reading the old format 1378 throw new IOException( 1379 "Persistence file does not start with protobuf magic number. " + persistencePath); 1380 } 1381 parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); 1382 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1383 blockNumber.add(backingMap.size()); 1384 LOG.info("Bucket cache retrieved from file successfully"); 1385 } 1386 } 1387 1388 private void updateRegionSizeMapWhileRetrievingFromFile() { 1389 // Update the regionCachedSize with the region size while restarting the region server 1390 if (LOG.isDebugEnabled()) { 1391 LOG.debug("Updating region size map after retrieving cached file list"); 1392 dumpPrefetchList(); 1393 } 1394 regionCachedSize.clear(); 1395 fullyCachedFiles.forEach((hFileName, hFileSize) -> { 1396 // Get the region name for each file 1397 String regionEncodedName = hFileSize.getFirst(); 1398 long cachedFileSize = hFileSize.getSecond(); 1399 regionCachedSize.merge(regionEncodedName, cachedFileSize, 1400 (oldpf, fileSize) -> oldpf + fileSize); 1401 }); 1402 } 1403 1404 private void dumpPrefetchList() { 1405 for (Map.Entry<String, Pair<String, Long>> outerEntry : fullyCachedFiles.entrySet()) { 1406 LOG.debug("Cached File Entry:<{},<{},{}>>", outerEntry.getKey(), 1407 outerEntry.getValue().getFirst(), outerEntry.getValue().getSecond()); 1408 } 1409 } 1410 1411 /** 1412 * Create an input stream that deletes the file after reading it. Use in try-with-resources to 1413 * avoid this pattern where an exception thrown from a finally block may mask earlier exceptions: 1414 * 1415 * <pre> 1416 * File f = ... 1417 * try (FileInputStream fis = new FileInputStream(f)) { 1418 * // use the input stream 1419 * } finally { 1420 * if (!f.delete()) throw new IOException("failed to delete"); 1421 * } 1422 * </pre> 1423 * 1424 * @param file the file to read and delete 1425 * @return a FileInputStream for the given file 1426 * @throws IOException if there is a problem creating the stream 1427 */ 1428 private FileInputStream deleteFileOnClose(final File file) throws IOException { 1429 return new FileInputStream(file) { 1430 private File myFile; 1431 1432 private FileInputStream init(File file) { 1433 myFile = file; 1434 return this; 1435 } 1436 1437 @Override 1438 public void close() throws IOException { 1439 // close() will be called during try-with-resources and it will be 1440 // called by finalizer thread during GC. To avoid double-free resource, 1441 // set myFile to null after the first call. 1442 if (myFile == null) { 1443 return; 1444 } 1445 1446 super.close(); 1447 if (!myFile.delete()) { 1448 throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath()); 1449 } 1450 myFile = null; 1451 } 1452 }.init(file); 1453 } 1454 1455 private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) 1456 throws IOException { 1457 if (capacitySize != cacheCapacity) { 1458 throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize) 1459 + ", expected: " + StringUtils.byteDesc(cacheCapacity)); 1460 } 1461 if (!ioEngine.getClass().getName().equals(ioclass)) { 1462 throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:" 1463 + ioEngine.getClass().getName()); 1464 } 1465 if (!backingMap.getClass().getName().equals(mapclass)) { 1466 throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:" 1467 + backingMap.getClass().getName()); 1468 } 1469 } 1470 1471 private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { 1472 Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair = 1473 BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), 1474 this::createRecycler); 1475 backingMap = pair.getFirst(); 1476 blocksByHFile = pair.getSecond(); 1477 fullyCachedFiles.clear(); 1478 fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap())); 1479 if (proto.hasChecksum()) { 1480 try { 1481 ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), 1482 algorithm); 1483 backingMapValidated.set(true); 1484 } catch (IOException e) { 1485 LOG.warn("Checksum for cache file failed. " 1486 + "We need to validate each cache key in the backing map. " 1487 + "This may take some time, so we'll do it in a background thread,"); 1488 Runnable cacheValidator = () -> { 1489 while (bucketAllocator == null) { 1490 try { 1491 Thread.sleep(50); 1492 } catch (InterruptedException ex) { 1493 throw new RuntimeException(ex); 1494 } 1495 } 1496 long startTime = EnvironmentEdgeManager.currentTime(); 1497 int totalKeysOriginally = backingMap.size(); 1498 for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : backingMap.entrySet()) { 1499 try { 1500 ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue()); 1501 } catch (IOException e1) { 1502 LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey()); 1503 evictBlock(keyEntry.getKey()); 1504 fullyCachedFiles.remove(keyEntry.getKey().getHfileName()); 1505 } 1506 } 1507 backingMapValidated.set(true); 1508 LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.", 1509 totalKeysOriginally, backingMap.size(), 1510 (EnvironmentEdgeManager.currentTime() - startTime)); 1511 }; 1512 Thread t = new Thread(cacheValidator); 1513 t.setDaemon(true); 1514 t.start(); 1515 } 1516 } else { 1517 // if has not checksum, it means the persistence file is old format 1518 LOG.info("Persistent file is old format, it does not support verifying file integrity!"); 1519 backingMapValidated.set(true); 1520 } 1521 updateRegionSizeMapWhileRetrievingFromFile(); 1522 verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); 1523 } 1524 1525 /** 1526 * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors 1527 * exceeds ioErrorsDurationTimeTolerated, we will disable the cache 1528 */ 1529 private void checkIOErrorIsTolerated() { 1530 long now = EnvironmentEdgeManager.currentTime(); 1531 // Do a single read to a local variable to avoid timing issue - HBASE-24454 1532 long ioErrorStartTimeTmp = this.ioErrorStartTime; 1533 if (ioErrorStartTimeTmp > 0) { 1534 if (cacheEnabled && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { 1535 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration 1536 + "ms, disabling cache, please check your IOEngine"); 1537 disableCache(); 1538 } 1539 } else { 1540 this.ioErrorStartTime = now; 1541 } 1542 } 1543 1544 /** 1545 * Used to shut down the cache -or- turn it off in the case of something broken. 1546 */ 1547 private void disableCache() { 1548 if (!cacheEnabled) return; 1549 LOG.info("Disabling cache"); 1550 cacheEnabled = false; 1551 ioEngine.shutdown(); 1552 this.scheduleThreadPool.shutdown(); 1553 for (int i = 0; i < writerThreads.length; ++i) 1554 writerThreads[i].interrupt(); 1555 this.ramCache.clear(); 1556 if (!ioEngine.isPersistent() || persistencePath == null) { 1557 // If persistent ioengine and a path, we will serialize out the backingMap. 1558 this.backingMap.clear(); 1559 this.blocksByHFile.clear(); 1560 this.fullyCachedFiles.clear(); 1561 this.regionCachedSize.clear(); 1562 } 1563 } 1564 1565 private void join() throws InterruptedException { 1566 for (int i = 0; i < writerThreads.length; ++i) 1567 writerThreads[i].join(); 1568 } 1569 1570 @Override 1571 public void shutdown() { 1572 disableCache(); 1573 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write=" 1574 + persistencePath); 1575 if (ioEngine.isPersistent() && persistencePath != null) { 1576 try { 1577 join(); 1578 if (cachePersister != null) { 1579 LOG.info("Shutting down cache persister thread."); 1580 cachePersister.shutdown(); 1581 while (cachePersister.isAlive()) { 1582 Thread.sleep(10); 1583 } 1584 } 1585 persistToFile(); 1586 } catch (IOException ex) { 1587 LOG.error("Unable to persist data on exit: " + ex.toString(), ex); 1588 } catch (InterruptedException e) { 1589 LOG.warn("Failed to persist data on exit", e); 1590 } 1591 } 1592 } 1593 1594 /** 1595 * Needed mostly for UTs that might run in the same VM and create different BucketCache instances 1596 * on different UT methods. 1597 */ 1598 @Override 1599 protected void finalize() { 1600 if (cachePersister != null && !cachePersister.isInterrupted()) { 1601 cachePersister.interrupt(); 1602 } 1603 } 1604 1605 @Override 1606 public CacheStats getStats() { 1607 return cacheStats; 1608 } 1609 1610 public BucketAllocator getAllocator() { 1611 return this.bucketAllocator; 1612 } 1613 1614 @Override 1615 public long heapSize() { 1616 return this.heapSize.sum(); 1617 } 1618 1619 @Override 1620 public long size() { 1621 return this.realCacheSize.sum(); 1622 } 1623 1624 @Override 1625 public long getCurrentDataSize() { 1626 return size(); 1627 } 1628 1629 @Override 1630 public long getFreeSize() { 1631 return this.bucketAllocator.getFreeSize(); 1632 } 1633 1634 @Override 1635 public long getBlockCount() { 1636 return this.blockNumber.sum(); 1637 } 1638 1639 @Override 1640 public long getDataBlockCount() { 1641 return getBlockCount(); 1642 } 1643 1644 @Override 1645 public long getCurrentSize() { 1646 return this.bucketAllocator.getUsedSize(); 1647 } 1648 1649 protected String getAlgorithm() { 1650 return algorithm; 1651 } 1652 1653 /** 1654 * Evicts all blocks for a specific HFile. 1655 * <p> 1656 * This is used for evict-on-close to remove all blocks of a specific HFile. 1657 * @return the number of blocks evicted 1658 */ 1659 @Override 1660 public int evictBlocksByHfileName(String hfileName) { 1661 fileNotFullyCached(hfileName); 1662 Set<BlockCacheKey> keySet = getAllCacheKeysForFile(hfileName); 1663 int numEvicted = 0; 1664 for (BlockCacheKey key : keySet) { 1665 if (evictBlock(key)) { 1666 ++numEvicted; 1667 } 1668 } 1669 return numEvicted; 1670 } 1671 1672 private Set<BlockCacheKey> getAllCacheKeysForFile(String hfileName) { 1673 return blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), true, 1674 new BlockCacheKey(hfileName, Long.MAX_VALUE), true); 1675 } 1676 1677 /** 1678 * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each 1679 * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate 1680 * number of elements out of each according to configuration parameters and their relative sizes. 1681 */ 1682 private class BucketEntryGroup { 1683 1684 private CachedEntryQueue queue; 1685 private long totalSize = 0; 1686 private long bucketSize; 1687 1688 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { 1689 this.bucketSize = bucketSize; 1690 queue = new CachedEntryQueue(bytesToFree, blockSize); 1691 totalSize = 0; 1692 } 1693 1694 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) { 1695 totalSize += block.getValue().getLength(); 1696 queue.add(block); 1697 } 1698 1699 public long free(long toFree) { 1700 Map.Entry<BlockCacheKey, BucketEntry> entry; 1701 long freedBytes = 0; 1702 // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free 1703 // What to do then? Caching attempt fail? Need some changes in cacheBlock API? 1704 while ((entry = queue.pollLast()) != null) { 1705 BlockCacheKey blockCacheKey = entry.getKey(); 1706 BucketEntry be = entry.getValue(); 1707 if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) { 1708 freedBytes += be.getLength(); 1709 } 1710 if (freedBytes >= toFree) { 1711 return freedBytes; 1712 } 1713 } 1714 return freedBytes; 1715 } 1716 1717 public long overflow() { 1718 return totalSize - bucketSize; 1719 } 1720 1721 public long totalSize() { 1722 return totalSize; 1723 } 1724 } 1725 1726 /** 1727 * Block Entry stored in the memory with key,data and so on 1728 */ 1729 static class RAMQueueEntry { 1730 private final BlockCacheKey key; 1731 private final Cacheable data; 1732 private long accessCounter; 1733 private boolean inMemory; 1734 private boolean isCachePersistent; 1735 1736 RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory, 1737 boolean isCachePersistent) { 1738 this.key = bck; 1739 this.data = data; 1740 this.accessCounter = accessCounter; 1741 this.inMemory = inMemory; 1742 this.isCachePersistent = isCachePersistent; 1743 } 1744 1745 public Cacheable getData() { 1746 return data; 1747 } 1748 1749 public BlockCacheKey getKey() { 1750 return key; 1751 } 1752 1753 public void access(long accessCounter) { 1754 this.accessCounter = accessCounter; 1755 } 1756 1757 private ByteBuffAllocator getByteBuffAllocator() { 1758 if (data instanceof HFileBlock) { 1759 return ((HFileBlock) data).getByteBuffAllocator(); 1760 } 1761 return ByteBuffAllocator.HEAP; 1762 } 1763 1764 public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, 1765 final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler, 1766 ByteBuffer metaBuff) throws IOException { 1767 int len = data.getSerializedLength(); 1768 // This cacheable thing can't be serialized 1769 if (len == 0) { 1770 return null; 1771 } 1772 if (isCachePersistent && data instanceof HFileBlock) { 1773 len += Long.BYTES; // we need to record the cache time for consistency check in case of 1774 // recovery 1775 } 1776 long offset = alloc.allocateBlock(len); 1777 boolean succ = false; 1778 BucketEntry bucketEntry = null; 1779 try { 1780 int diskSizeWithHeader = (data instanceof HFileBlock) 1781 ? ((HFileBlock) data).getOnDiskSizeWithHeader() 1782 : data.getSerializedLength(); 1783 bucketEntry = new BucketEntry(offset, len, diskSizeWithHeader, accessCounter, inMemory, 1784 createRecycler, getByteBuffAllocator()); 1785 bucketEntry.setDeserializerReference(data.getDeserializer()); 1786 if (data instanceof HFileBlock) { 1787 // If an instance of HFileBlock, save on some allocations. 1788 HFileBlock block = (HFileBlock) data; 1789 ByteBuff sliceBuf = block.getBufferReadOnly(); 1790 block.getMetaData(metaBuff); 1791 // adds the cache time prior to the block and metadata part 1792 if (isCachePersistent) { 1793 ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); 1794 buffer.putLong(bucketEntry.getCachedTime()); 1795 buffer.rewind(); 1796 ioEngine.write(buffer, offset); 1797 ioEngine.write(sliceBuf, (offset + Long.BYTES)); 1798 } else { 1799 ioEngine.write(sliceBuf, offset); 1800 } 1801 ioEngine.write(metaBuff, offset + len - metaBuff.limit()); 1802 } else { 1803 // Only used for testing. 1804 ByteBuffer bb = ByteBuffer.allocate(len); 1805 data.serialize(bb, true); 1806 ioEngine.write(bb, offset); 1807 } 1808 succ = true; 1809 } finally { 1810 if (!succ) { 1811 alloc.freeBlock(offset, len); 1812 } 1813 } 1814 realCacheSize.add(len); 1815 return bucketEntry; 1816 } 1817 } 1818 1819 /** 1820 * Only used in test 1821 */ 1822 void stopWriterThreads() throws InterruptedException { 1823 for (WriterThread writerThread : writerThreads) { 1824 writerThread.disableWriter(); 1825 writerThread.interrupt(); 1826 writerThread.join(); 1827 } 1828 } 1829 1830 @Override 1831 public Iterator<CachedBlock> iterator() { 1832 // Don't bother with ramcache since stuff is in here only a little while. 1833 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator(); 1834 return new Iterator<CachedBlock>() { 1835 private final long now = System.nanoTime(); 1836 1837 @Override 1838 public boolean hasNext() { 1839 return i.hasNext(); 1840 } 1841 1842 @Override 1843 public CachedBlock next() { 1844 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next(); 1845 return new CachedBlock() { 1846 @Override 1847 public String toString() { 1848 return BlockCacheUtil.toString(this, now); 1849 } 1850 1851 @Override 1852 public BlockPriority getBlockPriority() { 1853 return e.getValue().getPriority(); 1854 } 1855 1856 @Override 1857 public BlockType getBlockType() { 1858 // Not held by BucketEntry. Could add it if wanted on BucketEntry creation. 1859 return null; 1860 } 1861 1862 @Override 1863 public long getOffset() { 1864 return e.getKey().getOffset(); 1865 } 1866 1867 @Override 1868 public long getSize() { 1869 return e.getValue().getLength(); 1870 } 1871 1872 @Override 1873 public long getCachedTime() { 1874 return e.getValue().getCachedTime(); 1875 } 1876 1877 @Override 1878 public String getFilename() { 1879 return e.getKey().getHfileName(); 1880 } 1881 1882 @Override 1883 public int compareTo(CachedBlock other) { 1884 int diff = this.getFilename().compareTo(other.getFilename()); 1885 if (diff != 0) return diff; 1886 1887 diff = Long.compare(this.getOffset(), other.getOffset()); 1888 if (diff != 0) return diff; 1889 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1890 throw new IllegalStateException( 1891 "" + this.getCachedTime() + ", " + other.getCachedTime()); 1892 } 1893 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1894 } 1895 1896 @Override 1897 public int hashCode() { 1898 return e.getKey().hashCode(); 1899 } 1900 1901 @Override 1902 public boolean equals(Object obj) { 1903 if (obj instanceof CachedBlock) { 1904 CachedBlock cb = (CachedBlock) obj; 1905 return compareTo(cb) == 0; 1906 } else { 1907 return false; 1908 } 1909 } 1910 }; 1911 } 1912 1913 @Override 1914 public void remove() { 1915 throw new UnsupportedOperationException(); 1916 } 1917 }; 1918 } 1919 1920 @Override 1921 public BlockCache[] getBlockCaches() { 1922 return null; 1923 } 1924 1925 public int getRpcRefCount(BlockCacheKey cacheKey) { 1926 BucketEntry bucketEntry = backingMap.get(cacheKey); 1927 if (bucketEntry != null) { 1928 return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1); 1929 } 1930 return 0; 1931 } 1932 1933 float getAcceptableFactor() { 1934 return acceptableFactor; 1935 } 1936 1937 float getMinFactor() { 1938 return minFactor; 1939 } 1940 1941 float getExtraFreeFactor() { 1942 return extraFreeFactor; 1943 } 1944 1945 float getSingleFactor() { 1946 return singleFactor; 1947 } 1948 1949 float getMultiFactor() { 1950 return multiFactor; 1951 } 1952 1953 float getMemoryFactor() { 1954 return memoryFactor; 1955 } 1956 1957 public String getPersistencePath() { 1958 return persistencePath; 1959 } 1960 1961 /** 1962 * Wrapped the delegate ConcurrentMap with maintaining its block's reference count. 1963 */ 1964 static class RAMCache { 1965 /** 1966 * Defined the map as {@link ConcurrentHashMap} explicitly here, because in 1967 * {@link RAMCache#get(BlockCacheKey)} and 1968 * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee 1969 * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the 1970 * func method can execute exactly once only when the key is present(or absent) and under the 1971 * lock context. Otherwise, the reference count of block will be messed up. Notice that the 1972 * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 1973 */ 1974 final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>(); 1975 1976 public boolean containsKey(BlockCacheKey key) { 1977 return delegate.containsKey(key); 1978 } 1979 1980 public RAMQueueEntry get(BlockCacheKey key) { 1981 return delegate.computeIfPresent(key, (k, re) -> { 1982 // It'll be referenced by RPC, so retain atomically here. if the get and retain is not 1983 // atomic, another thread may remove and release the block, when retaining in this thread we 1984 // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422) 1985 re.getData().retain(); 1986 return re; 1987 }); 1988 } 1989 1990 /** 1991 * Return the previous associated value, or null if absent. It has the same meaning as 1992 * {@link ConcurrentMap#putIfAbsent(Object, Object)} 1993 */ 1994 public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) { 1995 AtomicBoolean absent = new AtomicBoolean(false); 1996 RAMQueueEntry re = delegate.computeIfAbsent(key, k -> { 1997 // The RAMCache reference to this entry, so reference count should be increment. 1998 entry.getData().retain(); 1999 absent.set(true); 2000 return entry; 2001 }); 2002 return absent.get() ? null : re; 2003 } 2004 2005 public boolean remove(BlockCacheKey key) { 2006 return remove(key, re -> { 2007 }); 2008 } 2009 2010 /** 2011 * Defined an {@link Consumer} here, because once the removed entry release its reference count, 2012 * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an 2013 * exception. the consumer will access entry to remove before release its reference count. 2014 * Notice, don't change its reference count in the {@link Consumer} 2015 */ 2016 public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) { 2017 RAMQueueEntry previous = delegate.remove(key); 2018 action.accept(previous); 2019 if (previous != null) { 2020 previous.getData().release(); 2021 } 2022 return previous != null; 2023 } 2024 2025 public boolean isEmpty() { 2026 return delegate.isEmpty(); 2027 } 2028 2029 public void clear() { 2030 Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator(); 2031 while (it.hasNext()) { 2032 RAMQueueEntry re = it.next().getValue(); 2033 it.remove(); 2034 re.getData().release(); 2035 } 2036 } 2037 2038 public boolean hasBlocksForFile(String fileName) { 2039 return delegate.keySet().stream().filter(key -> key.getHfileName().equals(fileName)) 2040 .findFirst().isPresent(); 2041 } 2042 } 2043 2044 public Map<BlockCacheKey, BucketEntry> getBackingMap() { 2045 return backingMap; 2046 } 2047 2048 public AtomicBoolean getBackingMapValidated() { 2049 return backingMapValidated; 2050 } 2051 2052 @Override 2053 public Optional<Map<String, Pair<String, Long>>> getFullyCachedFiles() { 2054 return Optional.of(fullyCachedFiles); 2055 } 2056 2057 public static Optional<BucketCache> getBucketCacheFromCacheConfig(CacheConfig cacheConf) { 2058 if (cacheConf.getBlockCache().isPresent()) { 2059 BlockCache bc = cacheConf.getBlockCache().get(); 2060 if (bc instanceof CombinedBlockCache) { 2061 BlockCache l2 = ((CombinedBlockCache) bc).getSecondLevelCache(); 2062 if (l2 instanceof BucketCache) { 2063 return Optional.of((BucketCache) l2); 2064 } 2065 } else if (bc instanceof BucketCache) { 2066 return Optional.of((BucketCache) bc); 2067 } 2068 } 2069 return Optional.empty(); 2070 } 2071 2072 @Override 2073 public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int dataBlockCount, 2074 long size) { 2075 // block eviction may be happening in the background as prefetch runs, 2076 // so we need to count all blocks for this file in the backing map under 2077 // a read lock for the block offset 2078 final List<ReentrantReadWriteLock> locks = new ArrayList<>(); 2079 LOG.debug("Notifying caching completed for file {}, with total blocks {}, and data blocks {}", 2080 fileName, totalBlockCount, dataBlockCount); 2081 try { 2082 final MutableInt count = new MutableInt(); 2083 LOG.debug("iterating over {} entries in the backing map", backingMap.size()); 2084 backingMap.entrySet().stream().forEach(entry -> { 2085 if ( 2086 entry.getKey().getHfileName().equals(fileName.getName()) 2087 && entry.getKey().getBlockType().equals(BlockType.DATA) 2088 ) { 2089 long offsetToLock = entry.getValue().offset(); 2090 LOG.debug("found block {} in the backing map. Acquiring read lock for offset {}", 2091 entry.getKey(), offsetToLock); 2092 ReentrantReadWriteLock lock = offsetLock.getLock(offsetToLock); 2093 lock.readLock().lock(); 2094 locks.add(lock); 2095 // rechecks the given key is still there (no eviction happened before the lock acquired) 2096 if (backingMap.containsKey(entry.getKey())) { 2097 count.increment(); 2098 } else { 2099 lock.readLock().unlock(); 2100 locks.remove(lock); 2101 LOG.debug("found block {}, but when locked and tried to count, it was gone."); 2102 } 2103 } 2104 }); 2105 int metaCount = totalBlockCount - dataBlockCount; 2106 // BucketCache would only have data blocks 2107 if (dataBlockCount == count.getValue()) { 2108 LOG.debug("File {} has now been fully cached.", fileName); 2109 fileCacheCompleted(fileName, size); 2110 } else { 2111 LOG.debug( 2112 "Prefetch executor completed for {}, but only {} data blocks were cached. " 2113 + "Total data blocks for file: {}. " 2114 + "Checking for blocks pending cache in cache writer queue.", 2115 fileName, count.getValue(), dataBlockCount); 2116 if (ramCache.hasBlocksForFile(fileName.getName())) { 2117 for (ReentrantReadWriteLock lock : locks) { 2118 lock.readLock().unlock(); 2119 } 2120 LOG.debug("There are still blocks pending caching for file {}. Will sleep 100ms " 2121 + "and try the verification again.", fileName); 2122 Thread.sleep(100); 2123 notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size); 2124 } else 2125 if ((getAllCacheKeysForFile(fileName.getName()).size() - metaCount) == dataBlockCount) { 2126 LOG.debug("We counted {} data blocks, expected was {}, there was no more pending in " 2127 + "the cache write queue but we now found that total cached blocks for file {} " 2128 + "is equal to data block count.", count, dataBlockCount, fileName.getName()); 2129 fileCacheCompleted(fileName, size); 2130 } else { 2131 LOG.info("We found only {} data blocks cached from a total of {} for file {}, " 2132 + "but no blocks pending caching. Maybe cache is full or evictions " 2133 + "happened concurrently to cache prefetch.", count, dataBlockCount, fileName); 2134 } 2135 } 2136 } catch (InterruptedException e) { 2137 throw new RuntimeException(e); 2138 } finally { 2139 for (ReentrantReadWriteLock lock : locks) { 2140 lock.readLock().unlock(); 2141 } 2142 } 2143 } 2144 2145 @Override 2146 public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) { 2147 long currentUsed = bucketAllocator.getUsedSize(); 2148 boolean result = (currentUsed + block.getOnDiskSizeWithHeader()) < acceptableSize(); 2149 return Optional.of(result); 2150 } 2151 2152 @Override 2153 public Optional<Boolean> shouldCacheFile(String fileName) { 2154 // if we don't have the file in fullyCachedFiles, we should cache it 2155 return Optional.of(!fullyCachedFiles.containsKey(fileName)); 2156 } 2157 2158 @Override 2159 public Optional<Boolean> isAlreadyCached(BlockCacheKey key) { 2160 return Optional.of(getBackingMap().containsKey(key)); 2161 } 2162 2163 @Override 2164 public Optional<Integer> getBlockSize(BlockCacheKey key) { 2165 BucketEntry entry = backingMap.get(key); 2166 if (entry == null) { 2167 return Optional.empty(); 2168 } else { 2169 return Optional.of(entry.getOnDiskSizeWithHeader()); 2170 } 2171 2172 } 2173}