001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021 022import java.io.File; 023import java.io.FileInputStream; 024import java.io.FileOutputStream; 025import java.io.IOException; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.Comparator; 031import java.util.HashSet; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Map; 035import java.util.NavigableSet; 036import java.util.Optional; 037import java.util.PriorityQueue; 038import java.util.Set; 039import java.util.concurrent.ArrayBlockingQueue; 040import java.util.concurrent.BlockingQueue; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ConcurrentMap; 043import java.util.concurrent.ConcurrentSkipListSet; 044import java.util.concurrent.Executors; 045import java.util.concurrent.ScheduledExecutorService; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.atomic.AtomicBoolean; 048import java.util.concurrent.atomic.AtomicLong; 049import java.util.concurrent.atomic.LongAdder; 050import java.util.concurrent.locks.Lock; 051import java.util.concurrent.locks.ReentrantLock; 052import java.util.concurrent.locks.ReentrantReadWriteLock; 053import java.util.function.Consumer; 054import java.util.function.Function; 055import org.apache.commons.lang3.mutable.MutableInt; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.fs.Path; 058import org.apache.hadoop.hbase.HBaseConfiguration; 059import org.apache.hadoop.hbase.HBaseIOException; 060import org.apache.hadoop.hbase.TableName; 061import org.apache.hadoop.hbase.client.Admin; 062import org.apache.hadoop.hbase.io.ByteBuffAllocator; 063import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 064import org.apache.hadoop.hbase.io.HeapSize; 065import org.apache.hadoop.hbase.io.hfile.BlockCache; 066import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 067import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 068import org.apache.hadoop.hbase.io.hfile.BlockPriority; 069import org.apache.hadoop.hbase.io.hfile.BlockType; 070import org.apache.hadoop.hbase.io.hfile.CacheConfig; 071import org.apache.hadoop.hbase.io.hfile.CacheStats; 072import org.apache.hadoop.hbase.io.hfile.Cacheable; 073import org.apache.hadoop.hbase.io.hfile.CachedBlock; 074import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 075import org.apache.hadoop.hbase.io.hfile.HFileBlock; 076import org.apache.hadoop.hbase.io.hfile.HFileContext; 077import org.apache.hadoop.hbase.nio.ByteBuff; 078import org.apache.hadoop.hbase.nio.RefCnt; 079import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 080import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 081import org.apache.hadoop.hbase.util.Bytes; 082import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 083import org.apache.hadoop.hbase.util.IdReadWriteLock; 084import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef; 085import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool; 086import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType; 087import org.apache.hadoop.hbase.util.Pair; 088import org.apache.hadoop.util.StringUtils; 089import org.apache.yetus.audience.InterfaceAudience; 090import org.slf4j.Logger; 091import org.slf4j.LoggerFactory; 092 093import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 094import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 095 096import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; 097 098/** 099 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache 100 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket 101 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap 102 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files 103 * {@link FileIOEngine} to store/read the block data. 104 * <p> 105 * Eviction is via a similar algorithm as used in 106 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} 107 * <p> 108 * BucketCache can be used as mainly a block cache (see 109 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to 110 * decrease CMS GC and heap fragmentation. 111 * <p> 112 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to 113 * enlarge cache space via a victim cache. 114 */ 115@InterfaceAudience.Private 116public class BucketCache implements BlockCache, HeapSize { 117 private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class); 118 119 /** Priority buckets config */ 120 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; 121 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; 122 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; 123 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; 124 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; 125 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; 126 static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 127 "hbase.bucketcache.persistence.chunksize"; 128 129 /** Use strong reference for offsetLock or not */ 130 private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref"; 131 private static final boolean STRONG_REF_DEFAULT = false; 132 133 /** Priority buckets */ 134 static final float DEFAULT_SINGLE_FACTOR = 0.25f; 135 static final float DEFAULT_MULTI_FACTOR = 0.50f; 136 static final float DEFAULT_MEMORY_FACTOR = 0.25f; 137 static final float DEFAULT_MIN_FACTOR = 0.85f; 138 139 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; 140 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 141 142 // Number of blocks to clear for each of the bucket size that is full 143 private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; 144 145 /** Statistics thread */ 146 private static final int statThreadPeriod = 5 * 60; 147 148 final static int DEFAULT_WRITER_THREADS = 3; 149 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; 150 151 final static long DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 10000; 152 153 // Store/read block data 154 transient final IOEngine ioEngine; 155 156 // Store the block in this map before writing it to cache 157 transient final RAMCache ramCache; 158 159 // In this map, store the block's meta data like offset, length 160 transient Map<BlockCacheKey, BucketEntry> backingMap; 161 162 private AtomicBoolean backingMapValidated = new AtomicBoolean(false); 163 164 /** 165 * Map of hFile -> Region -> File size. This map is used to track all files completed prefetch, 166 * together with the region those belong to and the total cached size for the 167 * region.TestBlockEvictionOnRegionMovement 168 */ 169 final Map<String, Pair<String, Long>> fullyCachedFiles = new ConcurrentHashMap<>(); 170 /** 171 * Map of region -> total size of the region prefetched on this region server. This is the total 172 * size of hFiles for this region prefetched on this region server 173 */ 174 final Map<String, Long> regionCachedSize = new ConcurrentHashMap<>(); 175 176 private BucketCachePersister cachePersister; 177 178 /** 179 * Enum to represent the state of cache 180 */ 181 protected enum CacheState { 182 // Initializing: State when the cache is being initialised from persistence. 183 INITIALIZING, 184 // Enabled: State when cache is initialised and is ready. 185 ENABLED, 186 // Disabled: State when the cache is disabled. 187 DISABLED 188 } 189 190 /** 191 * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so 192 * that Bucket IO exceptions/errors don't bring down the HBase server. 193 */ 194 private volatile CacheState cacheState; 195 196 /** 197 * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other 198 * words, the work adding blocks to the BucketCache is divided up amongst the running 199 * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when 200 * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It 201 * then updates the ramCache and backingMap accordingly. 202 */ 203 transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); 204 transient final WriterThread[] writerThreads; 205 206 /** Volatile boolean to track if free space is in process or not */ 207 private volatile boolean freeInProgress = false; 208 private transient final Lock freeSpaceLock = new ReentrantLock(); 209 210 private final LongAdder realCacheSize = new LongAdder(); 211 private final LongAdder heapSize = new LongAdder(); 212 /** Current number of cached elements */ 213 private final LongAdder blockNumber = new LongAdder(); 214 215 /** Cache access count (sequential ID) */ 216 private final AtomicLong accessCount = new AtomicLong(); 217 218 private static final int DEFAULT_CACHE_WAIT_TIME = 50; 219 220 private final BucketCacheStats cacheStats = new BucketCacheStats(); 221 private final String persistencePath; 222 static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false); 223 private final long cacheCapacity; 224 /** Approximate block size */ 225 private final long blockSize; 226 227 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ 228 private final int ioErrorsTolerationDuration; 229 // 1 min 230 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; 231 232 // Start time of first IO error when reading or writing IO Engine, it will be 233 // reset after a successful read/write. 234 private volatile long ioErrorStartTime = -1; 235 236 private Configuration conf; 237 238 /** 239 * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of 240 * this is to avoid freeing the block which is being read. 241 * <p> 242 */ 243 transient final IdReadWriteLock<Long> offsetLock; 244 245 NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>( 246 Comparator.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset)); 247 248 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 249 private transient final ScheduledExecutorService scheduleThreadPool = 250 Executors.newScheduledThreadPool(1, 251 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); 252 253 // Allocate or free space for the block 254 private transient BucketAllocator bucketAllocator; 255 256 /** Acceptable size of cache (no evictions if size < acceptable) */ 257 private float acceptableFactor; 258 259 /** Minimum threshold of cache (when evicting, evict until size < min) */ 260 private float minFactor; 261 262 /** 263 * Free this floating point factor of extra blocks when evicting. For example free the number of 264 * blocks requested * (1 + extraFreeFactor) 265 */ 266 private float extraFreeFactor; 267 268 /** Single access bucket size */ 269 private float singleFactor; 270 271 /** Multiple access bucket size */ 272 private float multiFactor; 273 274 /** In-memory bucket size */ 275 private float memoryFactor; 276 277 private long bucketcachePersistInterval; 278 279 private static final String FILE_VERIFY_ALGORITHM = 280 "hbase.bucketcache.persistent.file.integrity.check.algorithm"; 281 private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; 282 283 private static final String QUEUE_ADDITION_WAIT_TIME = 284 "hbase.bucketcache.queue.addition.waittime"; 285 private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0; 286 private long queueAdditionWaitTime; 287 /** 288 * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file 289 * integrity, default algorithm is MD5 290 */ 291 private String algorithm; 292 293 private long persistenceChunkSize; 294 295 /* Tracing failed Bucket Cache allocations. */ 296 private long allocFailLogPrevTs; // time of previous log event for allocation failure. 297 private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute. 298 299 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 300 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 301 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 302 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); 303 } 304 305 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 306 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 307 Configuration conf) throws IOException { 308 Preconditions.checkArgument(blockSize > 0, 309 "BucketCache capacity is set to " + blockSize + ", can not be less than 0"); 310 boolean useStrongRef = conf.getBoolean(STRONG_REF_KEY, STRONG_REF_DEFAULT); 311 if (useStrongRef) { 312 this.offsetLock = new IdReadWriteLockStrongRef<>(); 313 } else { 314 this.offsetLock = new IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT); 315 } 316 this.conf = conf; 317 this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); 318 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); 319 this.writerThreads = new WriterThread[writerThreadNum]; 320 long blockNumCapacity = capacity / blockSize; 321 if (blockNumCapacity >= Integer.MAX_VALUE) { 322 // Enough for about 32TB of cache! 323 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); 324 } 325 326 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); 327 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); 328 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); 329 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); 330 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); 331 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); 332 this.queueAdditionWaitTime = 333 conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); 334 this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000); 335 this.persistenceChunkSize = 336 conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE); 337 if (this.persistenceChunkSize <= 0) { 338 persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE; 339 } 340 341 sanityCheckConfigs(); 342 343 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor 344 + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " 345 + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor 346 + ", useStrongRef: " + useStrongRef); 347 348 this.cacheCapacity = capacity; 349 this.persistencePath = persistencePath; 350 this.blockSize = blockSize; 351 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; 352 this.cacheState = CacheState.INITIALIZING; 353 354 this.allocFailLogPrevTs = 0; 355 356 for (int i = 0; i < writerThreads.length; ++i) { 357 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); 358 } 359 360 assert writerQueues.size() == writerThreads.length; 361 this.ramCache = new RAMCache(); 362 363 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); 364 instantiateWriterThreads(); 365 366 if (isCachePersistent()) { 367 if (ioEngine instanceof FileIOEngine) { 368 startBucketCachePersisterThread(); 369 } 370 startPersistenceRetriever(bucketSizes, capacity); 371 } else { 372 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 373 this.cacheState = CacheState.ENABLED; 374 startWriterThreads(); 375 } 376 377 // Run the statistics thread periodically to print the cache statistics log 378 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 379 // every five minutes. 380 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, 381 statThreadPeriod, TimeUnit.SECONDS); 382 LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity=" 383 + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize) 384 + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" 385 + persistencePath + ", bucketAllocator=" + BucketAllocator.class.getName()); 386 } 387 388 private void startPersistenceRetriever(int[] bucketSizes, long capacity) { 389 Runnable persistentCacheRetriever = () -> { 390 try { 391 retrieveFromFile(bucketSizes); 392 LOG.info("Persistent bucket cache recovery from {} is complete.", persistencePath); 393 } catch (Throwable ex) { 394 LOG.warn("Can't restore from file[{}]. The bucket cache will be reset and rebuilt." 395 + " Exception seen: ", persistencePath, ex); 396 backingMap.clear(); 397 fullyCachedFiles.clear(); 398 backingMapValidated.set(true); 399 regionCachedSize.clear(); 400 try { 401 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 402 } catch (BucketAllocatorException allocatorException) { 403 LOG.error("Exception during Bucket Allocation", allocatorException); 404 } 405 } finally { 406 this.cacheState = CacheState.ENABLED; 407 startWriterThreads(); 408 } 409 }; 410 Thread t = new Thread(persistentCacheRetriever); 411 t.start(); 412 } 413 414 private void sanityCheckConfigs() { 415 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, 416 ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 417 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, 418 MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 419 Preconditions.checkArgument(minFactor <= acceptableFactor, 420 MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); 421 Preconditions.checkArgument(extraFreeFactor >= 0, 422 EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); 423 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, 424 SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 425 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, 426 MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 427 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, 428 MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 429 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, 430 SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and " 431 + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); 432 } 433 434 /** 435 * Called by the constructor to instantiate the writer threads. 436 */ 437 private void instantiateWriterThreads() { 438 final String threadName = Thread.currentThread().getName(); 439 for (int i = 0; i < this.writerThreads.length; ++i) { 440 this.writerThreads[i] = new WriterThread(this.writerQueues.get(i)); 441 this.writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); 442 this.writerThreads[i].setDaemon(true); 443 } 444 } 445 446 /** 447 * Called by the constructor to start the writer threads. Used by tests that need to override 448 * starting the threads. 449 */ 450 protected void startWriterThreads() { 451 for (WriterThread thread : writerThreads) { 452 thread.start(); 453 } 454 } 455 456 void startBucketCachePersisterThread() { 457 LOG.info("Starting BucketCachePersisterThread"); 458 cachePersister = new BucketCachePersister(this, bucketcachePersistInterval); 459 cachePersister.setDaemon(true); 460 cachePersister.start(); 461 } 462 463 @Override 464 public boolean isCacheEnabled() { 465 return this.cacheState == CacheState.ENABLED; 466 } 467 468 @Override 469 public long getMaxSize() { 470 return this.cacheCapacity; 471 } 472 473 public String getIoEngine() { 474 return ioEngine.toString(); 475 } 476 477 /** 478 * Get the IOEngine from the IO engine name 479 * @return the IOEngine 480 */ 481 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) 482 throws IOException { 483 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { 484 // In order to make the usage simple, we only need the prefix 'files:' in 485 // document whether one or multiple file(s), but also support 'file:' for 486 // the compatibility 487 String[] filePaths = 488 ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER); 489 return new FileIOEngine(capacity, persistencePath != null, filePaths); 490 } else if (ioEngineName.startsWith("offheap")) { 491 return new ByteBufferIOEngine(capacity); 492 } else if (ioEngineName.startsWith("mmap:")) { 493 return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 494 } else if (ioEngineName.startsWith("pmem:")) { 495 // This mode of bucket cache creates an IOEngine over a file on the persistent memory 496 // device. Since the persistent memory device has its own address space the contents 497 // mapped to this address space does not get swapped out like in the case of mmapping 498 // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache 499 // can be directly referred to without having to copy them onheap. Once the RPC is done, 500 // the blocks can be returned back as in case of ByteBufferIOEngine. 501 return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 502 } else { 503 throw new IllegalArgumentException( 504 "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); 505 } 506 } 507 508 public boolean isCachePersistenceEnabled() { 509 return persistencePath != null; 510 } 511 512 /** 513 * Cache the block with the specified name and buffer. 514 * @param cacheKey block's cache key 515 * @param buf block buffer 516 */ 517 @Override 518 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 519 cacheBlock(cacheKey, buf, false); 520 } 521 522 /** 523 * Cache the block with the specified name and buffer. 524 * @param cacheKey block's cache key 525 * @param cachedItem block buffer 526 * @param inMemory if block is in-memory 527 */ 528 @Override 529 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { 530 cacheBlockWithWait(cacheKey, cachedItem, inMemory, false); 531 } 532 533 /** 534 * Cache the block with the specified name and buffer. 535 * @param cacheKey block's cache key 536 * @param cachedItem block buffer 537 * @param inMemory if block is in-memory 538 */ 539 @Override 540 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 541 boolean waitWhenCache) { 542 cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0); 543 } 544 545 /** 546 * Cache the block to ramCache 547 * @param cacheKey block's cache key 548 * @param cachedItem block buffer 549 * @param inMemory if block is in-memory 550 * @param wait if true, blocking wait when queue is full 551 */ 552 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 553 boolean wait) { 554 if (isCacheEnabled()) { 555 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { 556 if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) { 557 BucketEntry bucketEntry = backingMap.get(cacheKey); 558 if (bucketEntry != null && bucketEntry.isRpcRef()) { 559 // avoid replace when there are RPC refs for the bucket entry in bucket cache 560 return; 561 } 562 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 563 } 564 } else { 565 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 566 } 567 } 568 } 569 570 protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { 571 return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock); 572 } 573 574 protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 575 boolean inMemory, boolean wait) { 576 if (!isCacheEnabled()) { 577 return; 578 } 579 if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) { 580 cacheKey.setBlockType(cachedItem.getBlockType()); 581 } 582 LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); 583 // Stuff the entry into the RAM cache so it can get drained to the persistent store 584 RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), 585 inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine); 586 /** 587 * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same 588 * key in ramCache, the heap size of bucket cache need to update if replacing entry from 589 * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if 590 * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct 591 * one, then the heap size will mess up (HBASE-20789) 592 */ 593 if (ramCache.putIfAbsent(cacheKey, re) != null) { 594 return; 595 } 596 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); 597 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); 598 boolean successfulAddition = false; 599 if (wait) { 600 try { 601 successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS); 602 } catch (InterruptedException e) { 603 Thread.currentThread().interrupt(); 604 } 605 } else { 606 successfulAddition = bq.offer(re); 607 } 608 if (!successfulAddition) { 609 LOG.debug("Failed to insert block {} into the cache writers queue", cacheKey); 610 ramCache.remove(cacheKey); 611 cacheStats.failInsert(); 612 } else { 613 this.blockNumber.increment(); 614 this.heapSize.add(cachedItem.heapSize()); 615 } 616 } 617 618 /** 619 * If the passed cache key relates to a reference (<hfile>.<parentEncRegion>), this 620 * method looks for the block from the referred file, in the cache. If present in the cache, the 621 * block for the referred file is returned, otherwise, this method returns null. It will also 622 * return null if the passed cache key doesn't relate to a reference. 623 * @param key the BlockCacheKey instance to look for in the cache. 624 * @return the cached block from the referred file, null if there's no such block in the cache or 625 * the passed key doesn't relate to a reference. 626 */ 627 public BucketEntry getBlockForReference(BlockCacheKey key) { 628 BucketEntry foundEntry = null; 629 String referredFileName = null; 630 if (StoreFileInfo.isReference(key.getHfileName())) { 631 referredFileName = StoreFileInfo.getReferredToRegionAndFile(key.getHfileName()).getSecond(); 632 } 633 if (referredFileName != null) { 634 BlockCacheKey convertedCacheKey = new BlockCacheKey(referredFileName, key.getOffset()); 635 foundEntry = backingMap.get(convertedCacheKey); 636 LOG.debug("Got a link/ref: {}. Related cacheKey: {}. Found entry: {}", key.getHfileName(), 637 convertedCacheKey, foundEntry); 638 } 639 return foundEntry; 640 } 641 642 /** 643 * Get the buffer of the block with the specified key. 644 * @param key block's cache key 645 * @param caching true if the caller caches blocks on cache misses 646 * @param repeat Whether this is a repeat lookup for the same block 647 * @param updateCacheMetrics Whether we should update cache metrics or not 648 * @return buffer of specified cache key, or null if not in cache 649 */ 650 @Override 651 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 652 boolean updateCacheMetrics) { 653 if (!isCacheEnabled()) { 654 return null; 655 } 656 RAMQueueEntry re = ramCache.get(key); 657 if (re != null) { 658 if (updateCacheMetrics) { 659 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 660 } 661 re.access(accessCount.incrementAndGet()); 662 return re.getData(); 663 } 664 BucketEntry bucketEntry = backingMap.get(key); 665 if (bucketEntry == null) { 666 bucketEntry = getBlockForReference(key); 667 } 668 if (bucketEntry != null) { 669 long start = System.nanoTime(); 670 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 671 try { 672 lock.readLock().lock(); 673 // We can not read here even if backingMap does contain the given key because its offset 674 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check 675 // existence here. 676 if ( 677 bucketEntry.equals(backingMap.get(key)) || bucketEntry.equals(getBlockForReference(key)) 678 ) { 679 // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the 680 // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to 681 // the same BucketEntry, then all of the three will share the same refCnt. 682 Cacheable cachedBlock = ioEngine.read(bucketEntry); 683 if (ioEngine.usesSharedMemory()) { 684 // If IOEngine use shared memory, cachedBlock and BucketEntry will share the 685 // same RefCnt, do retain here, in order to count the number of RPC references 686 cachedBlock.retain(); 687 } 688 // Update the cache statistics. 689 if (updateCacheMetrics) { 690 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 691 cacheStats.ioHit(System.nanoTime() - start); 692 } 693 bucketEntry.access(accessCount.incrementAndGet()); 694 if (this.ioErrorStartTime > 0) { 695 ioErrorStartTime = -1; 696 } 697 return cachedBlock; 698 } 699 } catch (HBaseIOException hioex) { 700 // When using file io engine persistent cache, 701 // the cache map state might differ from the actual cache. If we reach this block, 702 // we should remove the cache key entry from the backing map 703 backingMap.remove(key); 704 fullyCachedFiles.remove(key.getHfileName()); 705 LOG.debug("Failed to fetch block for cache key: {}.", key, hioex); 706 } catch (IOException ioex) { 707 LOG.error("Failed reading block " + key + " from bucket cache", ioex); 708 checkIOErrorIsTolerated(); 709 } finally { 710 lock.readLock().unlock(); 711 } 712 } 713 if (!repeat && updateCacheMetrics) { 714 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 715 } 716 return null; 717 } 718 719 /** 720 * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap} 721 */ 722 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 723 boolean evictedByEvictionProcess) { 724 bucketEntry.markAsEvicted(); 725 blocksByHFile.remove(cacheKey); 726 if (decrementBlockNumber) { 727 this.blockNumber.decrement(); 728 if (ioEngine.isPersistent()) { 729 fileNotFullyCached(cacheKey.getHfileName()); 730 } 731 } 732 if (evictedByEvictionProcess) { 733 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); 734 } 735 if (ioEngine.isPersistent()) { 736 setCacheInconsistent(true); 737 } 738 } 739 740 private void fileNotFullyCached(String hfileName) { 741 // Update the regionPrefetchedSizeMap before removing the file from prefetchCompleted 742 if (fullyCachedFiles.containsKey(hfileName)) { 743 Pair<String, Long> regionEntry = fullyCachedFiles.get(hfileName); 744 String regionEncodedName = regionEntry.getFirst(); 745 long filePrefetchSize = regionEntry.getSecond(); 746 LOG.debug("Removing file {} for region {}", hfileName, regionEncodedName); 747 regionCachedSize.computeIfPresent(regionEncodedName, (rn, pf) -> pf - filePrefetchSize); 748 // If all the blocks for a region are evicted from the cache, remove the entry for that region 749 if ( 750 regionCachedSize.containsKey(regionEncodedName) 751 && regionCachedSize.get(regionEncodedName) == 0 752 ) { 753 regionCachedSize.remove(regionEncodedName); 754 } 755 } 756 fullyCachedFiles.remove(hfileName); 757 } 758 759 public void fileCacheCompleted(Path filePath, long size) { 760 Pair<String, Long> pair = new Pair<>(); 761 // sets the region name 762 String regionName = filePath.getParent().getParent().getName(); 763 pair.setFirst(regionName); 764 pair.setSecond(size); 765 fullyCachedFiles.put(filePath.getName(), pair); 766 } 767 768 private void updateRegionCachedSize(Path filePath, long cachedSize) { 769 if (filePath != null) { 770 String regionName = filePath.getParent().getParent().getName(); 771 regionCachedSize.merge(regionName, cachedSize, 772 (previousSize, newBlockSize) -> previousSize + newBlockSize); 773 } 774 } 775 776 /** 777 * Free the {{@link BucketEntry} actually,which could only be invoked when the 778 * {@link BucketEntry#refCnt} becoming 0. 779 */ 780 void freeBucketEntry(BucketEntry bucketEntry) { 781 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 782 realCacheSize.add(-1 * bucketEntry.getLength()); 783 } 784 785 /** 786 * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br> 787 * 1. Close an HFile, and clear all cached blocks. <br> 788 * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br> 789 * <p> 790 * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap. 791 * Here we evict the block from backingMap immediately, but only free the reference from bucket 792 * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this 793 * block, block can only be de-allocated when all of them release the block. 794 * <p> 795 * NOTICE: we need to grab the write offset lock firstly before releasing the reference from 796 * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when 797 * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak. 798 * @param cacheKey Block to evict 799 * @return true to indicate whether we've evicted successfully or not. 800 */ 801 @Override 802 public boolean evictBlock(BlockCacheKey cacheKey) { 803 return doEvictBlock(cacheKey, null, false); 804 } 805 806 /** 807 * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and 808 * {@link BucketCache#ramCache}. <br/> 809 * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 810 * {@link BucketEntry} could be removed. 811 * @param cacheKey {@link BlockCacheKey} to evict. 812 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 813 * @return true to indicate whether we've evicted successfully or not. 814 */ 815 private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, 816 boolean evictedByEvictionProcess) { 817 if (!isCacheEnabled()) { 818 return false; 819 } 820 boolean existedInRamCache = removeFromRamCache(cacheKey); 821 if (bucketEntry == null) { 822 bucketEntry = backingMap.get(cacheKey); 823 } 824 final BucketEntry bucketEntryToUse = bucketEntry; 825 826 if (bucketEntryToUse == null) { 827 if (existedInRamCache && evictedByEvictionProcess) { 828 cacheStats.evicted(0, cacheKey.isPrimary()); 829 } 830 return existedInRamCache; 831 } else { 832 return bucketEntryToUse.withWriteLock(offsetLock, () -> { 833 if (backingMap.remove(cacheKey, bucketEntryToUse)) { 834 LOG.debug("removed key {} from back map with offset lock {} in the evict process", 835 cacheKey, bucketEntryToUse.offset()); 836 blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess); 837 return true; 838 } 839 return false; 840 }); 841 } 842 } 843 844 /** 845 * <pre> 846 * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as 847 * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}. 848 * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf} 849 * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different: 850 * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap}, 851 * it is the return value of current {@link BucketCache#createRecycler} method. 852 * 853 * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache}, 854 * it is {@link ByteBuffAllocator#putbackBuffer}. 855 * </pre> 856 */ 857 public Recycler createRecycler(final BucketEntry bucketEntry) { 858 return () -> { 859 freeBucketEntry(bucketEntry); 860 return; 861 }; 862 } 863 864 /** 865 * NOTE: This method is only for test. 866 */ 867 public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) { 868 BucketEntry bucketEntry = backingMap.get(blockCacheKey); 869 if (bucketEntry == null) { 870 return false; 871 } 872 return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry); 873 } 874 875 /** 876 * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if 877 * {@link BucketEntry#isRpcRef} is false. <br/> 878 * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 879 * {@link BucketEntry} could be removed. 880 * @param blockCacheKey {@link BlockCacheKey} to evict. 881 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 882 * @return true to indicate whether we've evicted successfully or not. 883 */ 884 boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) { 885 if (!bucketEntry.isRpcRef()) { 886 return doEvictBlock(blockCacheKey, bucketEntry, true); 887 } 888 return false; 889 } 890 891 protected boolean removeFromRamCache(BlockCacheKey cacheKey) { 892 return ramCache.remove(cacheKey, re -> { 893 if (re != null) { 894 this.blockNumber.decrement(); 895 this.heapSize.add(-1 * re.getData().heapSize()); 896 } 897 }); 898 } 899 900 public boolean isCacheInconsistent() { 901 return isCacheInconsistent.get(); 902 } 903 904 public void setCacheInconsistent(boolean setCacheInconsistent) { 905 isCacheInconsistent.set(setCacheInconsistent); 906 } 907 908 protected void setCacheState(CacheState state) { 909 cacheState = state; 910 } 911 912 /* 913 * Statistics thread. Periodically output cache statistics to the log. 914 */ 915 private static class StatisticsThread extends Thread { 916 private final BucketCache bucketCache; 917 918 public StatisticsThread(BucketCache bucketCache) { 919 super("BucketCacheStatsThread"); 920 setDaemon(true); 921 this.bucketCache = bucketCache; 922 } 923 924 @Override 925 public void run() { 926 bucketCache.logStats(); 927 } 928 } 929 930 public void logStats() { 931 if (!isCacheInitialized("BucketCache::logStats")) { 932 return; 933 } 934 935 long totalSize = bucketAllocator.getTotalSize(); 936 long usedSize = bucketAllocator.getUsedSize(); 937 long freeSize = totalSize - usedSize; 938 long cacheSize = getRealCacheSize(); 939 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize=" 940 + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", " 941 + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize=" 942 + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", " 943 + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond=" 944 + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit=" 945 + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio=" 946 + (cacheStats.getHitCount() == 0 947 ? "0," 948 : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", ")) 949 + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits=" 950 + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio=" 951 + (cacheStats.getHitCachingCount() == 0 952 ? "0," 953 : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", ")) 954 + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted=" 955 + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction() 956 + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount() + ", blocksCount=" 957 + backingMap.size()); 958 cacheStats.reset(); 959 960 bucketAllocator.logDebugStatistics(); 961 } 962 963 public long getRealCacheSize() { 964 return this.realCacheSize.sum(); 965 } 966 967 public long acceptableSize() { 968 if (!isCacheInitialized("BucketCache::acceptableSize")) { 969 return 0; 970 } 971 return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); 972 } 973 974 long getPartitionSize(float partitionFactor) { 975 if (!isCacheInitialized("BucketCache::getPartitionSize")) { 976 return 0; 977 } 978 979 return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); 980 } 981 982 /** 983 * Return the count of bucketSizeinfos still need free space 984 */ 985 private int bucketSizesAboveThresholdCount(float minFactor) { 986 if (!isCacheInitialized("BucketCache::bucketSizesAboveThresholdCount")) { 987 return 0; 988 } 989 990 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 991 int fullCount = 0; 992 for (int i = 0; i < stats.length; i++) { 993 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 994 freeGoal = Math.max(freeGoal, 1); 995 if (stats[i].freeCount() < freeGoal) { 996 fullCount++; 997 } 998 } 999 return fullCount; 1000 } 1001 1002 /** 1003 * This method will find the buckets that are minimally occupied and are not reference counted and 1004 * will free them completely without any constraint on the access times of the elements, and as a 1005 * process will completely free at most the number of buckets passed, sometimes it might not due 1006 * to changing refCounts 1007 * @param completelyFreeBucketsNeeded number of buckets to free 1008 **/ 1009 private void freeEntireBuckets(int completelyFreeBucketsNeeded) { 1010 if (!isCacheInitialized("BucketCache::freeEntireBuckets")) { 1011 return; 1012 } 1013 1014 if (completelyFreeBucketsNeeded != 0) { 1015 // First we will build a set where the offsets are reference counted, usually 1016 // this set is small around O(Handler Count) unless something else is wrong 1017 Set<Integer> inUseBuckets = new HashSet<>(); 1018 backingMap.forEach((k, be) -> { 1019 if (be.isRpcRef()) { 1020 inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset())); 1021 } 1022 }); 1023 Set<Integer> candidateBuckets = 1024 bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded); 1025 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) { 1026 if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) { 1027 evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue()); 1028 } 1029 } 1030 } 1031 } 1032 1033 /** 1034 * Free the space if the used size reaches acceptableSize() or one size block couldn't be 1035 * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some 1036 * blocks evicted 1037 * @param why Why we are being called 1038 */ 1039 void freeSpace(final String why) { 1040 if (!isCacheInitialized("BucketCache::freeSpace")) { 1041 return; 1042 } 1043 // Ensure only one freeSpace progress at a time 1044 if (!freeSpaceLock.tryLock()) { 1045 return; 1046 } 1047 try { 1048 freeInProgress = true; 1049 long bytesToFreeWithoutExtra = 0; 1050 // Calculate free byte for each bucketSizeinfo 1051 StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null; 1052 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 1053 long[] bytesToFreeForBucket = new long[stats.length]; 1054 for (int i = 0; i < stats.length; i++) { 1055 bytesToFreeForBucket[i] = 0; 1056 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 1057 freeGoal = Math.max(freeGoal, 1); 1058 if (stats[i].freeCount() < freeGoal) { 1059 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); 1060 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; 1061 if (msgBuffer != null) { 1062 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" 1063 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); 1064 } 1065 } 1066 } 1067 if (msgBuffer != null) { 1068 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); 1069 } 1070 1071 if (bytesToFreeWithoutExtra <= 0) { 1072 return; 1073 } 1074 long currentSize = bucketAllocator.getUsedSize(); 1075 long totalSize = bucketAllocator.getTotalSize(); 1076 if (LOG.isDebugEnabled() && msgBuffer != null) { 1077 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() 1078 + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" 1079 + StringUtils.byteDesc(realCacheSize.sum()) + ", total=" 1080 + StringUtils.byteDesc(totalSize)); 1081 } 1082 1083 long bytesToFreeWithExtra = 1084 (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor)); 1085 1086 // Instantiate priority buckets 1087 BucketEntryGroup bucketSingle = 1088 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor)); 1089 BucketEntryGroup bucketMulti = 1090 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor)); 1091 BucketEntryGroup bucketMemory = 1092 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor)); 1093 1094 // Scan entire map putting bucket entry into appropriate bucket entry 1095 // group 1096 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { 1097 switch (bucketEntryWithKey.getValue().getPriority()) { 1098 case SINGLE: { 1099 bucketSingle.add(bucketEntryWithKey); 1100 break; 1101 } 1102 case MULTI: { 1103 bucketMulti.add(bucketEntryWithKey); 1104 break; 1105 } 1106 case MEMORY: { 1107 bucketMemory.add(bucketEntryWithKey); 1108 break; 1109 } 1110 } 1111 } 1112 1113 PriorityQueue<BucketEntryGroup> bucketQueue = 1114 new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow)); 1115 1116 bucketQueue.add(bucketSingle); 1117 bucketQueue.add(bucketMulti); 1118 bucketQueue.add(bucketMemory); 1119 1120 int remainingBuckets = bucketQueue.size(); 1121 long bytesFreed = 0; 1122 1123 BucketEntryGroup bucketGroup; 1124 while ((bucketGroup = bucketQueue.poll()) != null) { 1125 long overflow = bucketGroup.overflow(); 1126 if (overflow > 0) { 1127 long bucketBytesToFree = 1128 Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); 1129 bytesFreed += bucketGroup.free(bucketBytesToFree); 1130 } 1131 remainingBuckets--; 1132 } 1133 1134 // Check and free if there are buckets that still need freeing of space 1135 if (bucketSizesAboveThresholdCount(minFactor) > 0) { 1136 bucketQueue.clear(); 1137 remainingBuckets = 3; 1138 1139 bucketQueue.add(bucketSingle); 1140 bucketQueue.add(bucketMulti); 1141 bucketQueue.add(bucketMemory); 1142 1143 while ((bucketGroup = bucketQueue.poll()) != null) { 1144 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; 1145 bytesFreed += bucketGroup.free(bucketBytesToFree); 1146 remainingBuckets--; 1147 } 1148 } 1149 1150 // Even after the above free we might still need freeing because of the 1151 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e 1152 // there might be some buckets where the occupancy is very sparse and thus are not 1153 // yielding the free for the other bucket sizes, the fix for this to evict some 1154 // of the buckets, we do this by evicting the buckets that are least fulled 1155 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f)); 1156 1157 if (LOG.isDebugEnabled()) { 1158 long single = bucketSingle.totalSize(); 1159 long multi = bucketMulti.totalSize(); 1160 long memory = bucketMemory.totalSize(); 1161 if (LOG.isDebugEnabled()) { 1162 LOG.debug("Bucket cache free space completed; " + "freed=" 1163 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize) 1164 + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" 1165 + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); 1166 } 1167 } 1168 1169 } catch (Throwable t) { 1170 LOG.warn("Failed freeing space", t); 1171 } finally { 1172 cacheStats.evict(); 1173 freeInProgress = false; 1174 freeSpaceLock.unlock(); 1175 } 1176 } 1177 1178 // This handles flushing the RAM cache to IOEngine. 1179 class WriterThread extends Thread { 1180 private final BlockingQueue<RAMQueueEntry> inputQueue; 1181 private volatile boolean writerEnabled = true; 1182 private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE); 1183 1184 WriterThread(BlockingQueue<RAMQueueEntry> queue) { 1185 super("BucketCacheWriterThread"); 1186 this.inputQueue = queue; 1187 } 1188 1189 // Used for test 1190 void disableWriter() { 1191 this.writerEnabled = false; 1192 } 1193 1194 @Override 1195 public void run() { 1196 List<RAMQueueEntry> entries = new ArrayList<>(); 1197 try { 1198 while (isCacheEnabled() && writerEnabled) { 1199 try { 1200 try { 1201 // Blocks 1202 entries = getRAMQueueEntries(inputQueue, entries); 1203 } catch (InterruptedException ie) { 1204 if (!isCacheEnabled() || !writerEnabled) { 1205 break; 1206 } 1207 } 1208 doDrain(entries, metaBuff); 1209 } catch (Exception ioe) { 1210 LOG.error("WriterThread encountered error", ioe); 1211 } 1212 } 1213 } catch (Throwable t) { 1214 LOG.warn("Failed doing drain", t); 1215 } 1216 LOG.info(this.getName() + " exiting, cacheEnabled=" + isCacheEnabled()); 1217 } 1218 } 1219 1220 /** 1221 * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing 1222 * cache with a new block for the same cache key. there's a corner case: one thread cache a block 1223 * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block 1224 * with the same cache key do the same thing for the same cache key, so if not evict the previous 1225 * bucket entry, then memory leak happen because the previous bucketEntry is gone but the 1226 * bucketAllocator do not free its memory. 1227 * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey 1228 * cacheKey, Cacheable newBlock) 1229 * @param key Block cache key 1230 * @param bucketEntry Bucket entry to put into backingMap. 1231 */ 1232 protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 1233 BucketEntry previousEntry = backingMap.put(key, bucketEntry); 1234 blocksByHFile.add(key); 1235 updateRegionCachedSize(key.getFilePath(), bucketEntry.getLength()); 1236 if (previousEntry != null && previousEntry != bucketEntry) { 1237 previousEntry.withWriteLock(offsetLock, () -> { 1238 blockEvicted(key, previousEntry, false, false); 1239 return null; 1240 }); 1241 } 1242 } 1243 1244 /** 1245 * Prepare and return a warning message for Bucket Allocator Exception 1246 * @param fle The exception 1247 * @param re The RAMQueueEntry for which the exception was thrown. 1248 * @return A warning message created from the input RAMQueueEntry object. 1249 */ 1250 private static String getAllocationFailWarningMessage(final BucketAllocatorException fle, 1251 final RAMQueueEntry re) { 1252 final StringBuilder sb = new StringBuilder(); 1253 sb.append("Most recent failed allocation after "); 1254 sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD); 1255 sb.append(" ms;"); 1256 if (re != null) { 1257 if (re.getData() instanceof HFileBlock) { 1258 final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext(); 1259 final String columnFamily = Bytes.toString(fileContext.getColumnFamily()); 1260 final String tableName = Bytes.toString(fileContext.getTableName()); 1261 if (tableName != null) { 1262 sb.append(" Table: "); 1263 sb.append(tableName); 1264 } 1265 if (columnFamily != null) { 1266 sb.append(" CF: "); 1267 sb.append(columnFamily); 1268 } 1269 sb.append(" HFile: "); 1270 if (fileContext.getHFileName() != null) { 1271 sb.append(fileContext.getHFileName()); 1272 } else { 1273 sb.append(re.getKey()); 1274 } 1275 } else { 1276 sb.append(" HFile: "); 1277 sb.append(re.getKey()); 1278 } 1279 } 1280 sb.append(" Message: "); 1281 sb.append(fle.getMessage()); 1282 return sb.toString(); 1283 } 1284 1285 /** 1286 * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that 1287 * are passed in even if failure being sure to remove from ramCache else we'll never undo the 1288 * references and we'll OOME. 1289 * @param entries Presumes list passed in here will be processed by this invocation only. No 1290 * interference expected. 1291 */ 1292 void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException { 1293 if (entries.isEmpty()) { 1294 return; 1295 } 1296 // This method is a little hard to follow. We run through the passed in entries and for each 1297 // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must 1298 // do cleanup making sure we've cleared ramCache of all entries regardless of whether we 1299 // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by 1300 // filling ramCache. We do the clean up by again running through the passed in entries 1301 // doing extra work when we find a non-null bucketEntries corresponding entry. 1302 final int size = entries.size(); 1303 BucketEntry[] bucketEntries = new BucketEntry[size]; 1304 // Index updated inside loop if success or if we can't succeed. We retry if cache is full 1305 // when we go to add an entry by going around the loop again without upping the index. 1306 int index = 0; 1307 while (isCacheEnabled() && index < size) { 1308 RAMQueueEntry re = null; 1309 try { 1310 re = entries.get(index); 1311 if (re == null) { 1312 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); 1313 index++; 1314 continue; 1315 } 1316 // Reset the position for reuse. 1317 // It should be guaranteed that the data in the metaBuff has been transferred to the 1318 // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already 1319 // transferred with our current IOEngines. Should take care, when we have new kinds of 1320 // IOEngine in the future. 1321 metaBuff.clear(); 1322 BucketEntry bucketEntry = 1323 re.writeToCache(ioEngine, bucketAllocator, realCacheSize, this::createRecycler, metaBuff); 1324 // Successfully added. Up index and add bucketEntry. Clear io exceptions. 1325 bucketEntries[index] = bucketEntry; 1326 if (ioErrorStartTime > 0) { 1327 ioErrorStartTime = -1; 1328 } 1329 index++; 1330 } catch (BucketAllocatorException fle) { 1331 long currTs = EnvironmentEdgeManager.currentTime(); 1332 cacheStats.allocationFailed(); // Record the warning. 1333 if ( 1334 allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD 1335 ) { 1336 LOG.warn(getAllocationFailWarningMessage(fle, re)); 1337 allocFailLogPrevTs = currTs; 1338 } 1339 // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. 1340 bucketEntries[index] = null; 1341 index++; 1342 } catch (CacheFullException cfe) { 1343 // Cache full when we tried to add. Try freeing space and then retrying (don't up index) 1344 if (!freeInProgress) { 1345 freeSpace("Full!"); 1346 } else { 1347 Thread.sleep(50); 1348 } 1349 } catch (IOException ioex) { 1350 // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. 1351 LOG.error("Failed writing to bucket cache", ioex); 1352 checkIOErrorIsTolerated(); 1353 } 1354 } 1355 1356 // Make sure data pages are written on media before we update maps. 1357 try { 1358 ioEngine.sync(); 1359 } catch (IOException ioex) { 1360 LOG.error("Failed syncing IO engine", ioex); 1361 checkIOErrorIsTolerated(); 1362 // Since we failed sync, free the blocks in bucket allocator 1363 for (int i = 0; i < entries.size(); ++i) { 1364 BucketEntry bucketEntry = bucketEntries[i]; 1365 if (bucketEntry != null) { 1366 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 1367 bucketEntries[i] = null; 1368 } 1369 } 1370 } 1371 1372 // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if 1373 // success or error. 1374 for (int i = 0; i < size; ++i) { 1375 BlockCacheKey key = entries.get(i).getKey(); 1376 // Only add if non-null entry. 1377 if (bucketEntries[i] != null) { 1378 putIntoBackingMap(key, bucketEntries[i]); 1379 if (ioEngine.isPersistent()) { 1380 setCacheInconsistent(true); 1381 } 1382 } 1383 // Always remove from ramCache even if we failed adding it to the block cache above. 1384 boolean existed = ramCache.remove(key, re -> { 1385 if (re != null) { 1386 heapSize.add(-1 * re.getData().heapSize()); 1387 } 1388 }); 1389 if (!existed && bucketEntries[i] != null) { 1390 // Block should have already been evicted. Remove it and free space. 1391 final BucketEntry bucketEntry = bucketEntries[i]; 1392 bucketEntry.withWriteLock(offsetLock, () -> { 1393 if (backingMap.remove(key, bucketEntry)) { 1394 blockEvicted(key, bucketEntry, false, false); 1395 } 1396 return null; 1397 }); 1398 } 1399 } 1400 1401 long used = bucketAllocator.getUsedSize(); 1402 if (used > acceptableSize()) { 1403 freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); 1404 } 1405 return; 1406 } 1407 1408 /** 1409 * Blocks until elements available in {@code q} then tries to grab as many as possible before 1410 * returning. 1411 * @param receptacle Where to stash the elements taken from queue. We clear before we use it just 1412 * in case. 1413 * @param q The queue to take from. 1414 * @return {@code receptacle} laden with elements taken from the queue or empty if none found. 1415 */ 1416 static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q, 1417 List<RAMQueueEntry> receptacle) throws InterruptedException { 1418 // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it 1419 // ok even if list grew to accommodate thousands. 1420 receptacle.clear(); 1421 receptacle.add(q.take()); 1422 q.drainTo(receptacle); 1423 return receptacle; 1424 } 1425 1426 /** 1427 * @see #retrieveFromFile(int[]) 1428 */ 1429 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", 1430 justification = "false positive, try-with-resources ensures close is called.") 1431 void persistToFile() throws IOException { 1432 LOG.debug("Thread {} started persisting bucket cache to file", 1433 Thread.currentThread().getName()); 1434 if (!isCachePersistent()) { 1435 throw new IOException("Attempt to persist non-persistent cache mappings!"); 1436 } 1437 File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime()); 1438 try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) { 1439 LOG.debug("Persist in new chunked persistence format."); 1440 1441 persistChunkedBackingMap(fos); 1442 1443 LOG.debug( 1444 "PersistToFile: after persisting backing map size: {}, fullycachedFiles size: {}," 1445 + " file name: {}", 1446 backingMap.size(), fullyCachedFiles.size(), tempPersistencePath.getName()); 1447 } catch (IOException e) { 1448 LOG.error("Failed to persist bucket cache to file", e); 1449 throw e; 1450 } catch (Throwable e) { 1451 LOG.error("Failed during persist bucket cache to file: ", e); 1452 throw e; 1453 } 1454 LOG.debug("Thread {} finished persisting bucket cache to file, renaming", 1455 Thread.currentThread().getName()); 1456 if (!tempPersistencePath.renameTo(new File(persistencePath))) { 1457 LOG.warn("Failed to commit cache persistent file. We might lose cached blocks if " 1458 + "RS crashes/restarts before we successfully checkpoint again."); 1459 } 1460 } 1461 1462 public boolean isCachePersistent() { 1463 return ioEngine.isPersistent() && persistencePath != null; 1464 } 1465 1466 @Override 1467 public Optional<Map<String, Long>> getRegionCachedInfo() { 1468 return Optional.of(Collections.unmodifiableMap(regionCachedSize)); 1469 } 1470 1471 /** 1472 * @see #persistToFile() 1473 */ 1474 private void retrieveFromFile(int[] bucketSizes) throws IOException { 1475 LOG.info("Started retrieving bucket cache from file"); 1476 File persistenceFile = new File(persistencePath); 1477 if (!persistenceFile.exists()) { 1478 LOG.warn("Persistence file missing! " 1479 + "It's ok if it's first run after enabling persistent cache."); 1480 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1481 blockNumber.add(backingMap.size()); 1482 backingMapValidated.set(true); 1483 return; 1484 } 1485 assert !isCacheEnabled(); 1486 1487 try (FileInputStream in = new FileInputStream(persistenceFile)) { 1488 int pblen = ProtobufMagic.lengthOfPBMagic(); 1489 byte[] pbuf = new byte[pblen]; 1490 int read = in.read(pbuf); 1491 if (read != pblen) { 1492 throw new IOException("Incorrect number of bytes read while checking for protobuf magic " 1493 + "number. Requested=" + pblen + ", Received= " + read + ", File=" + persistencePath); 1494 } 1495 if (ProtobufMagic.isPBMagicPrefix(pbuf)) { 1496 LOG.info("Reading old format of persistence."); 1497 // The old non-chunked version of backing map persistence. 1498 parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); 1499 } else if (Arrays.equals(pbuf, BucketProtoUtils.PB_MAGIC_V2)) { 1500 // The new persistence format of chunked persistence. 1501 LOG.info("Reading new chunked format of persistence."); 1502 retrieveChunkedBackingMap(in); 1503 } else { 1504 // In 3.0 we have enough flexibility to dump the old cache data. 1505 // TODO: In 2.x line, this might need to be filled in to support reading the old format 1506 throw new IOException( 1507 "Persistence file does not start with protobuf magic number. " + persistencePath); 1508 } 1509 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1510 blockNumber.add(backingMap.size()); 1511 LOG.info("Bucket cache retrieved from file successfully with size: {}", backingMap.size()); 1512 } 1513 } 1514 1515 private void updateRegionSizeMapWhileRetrievingFromFile() { 1516 // Update the regionCachedSize with the region size while restarting the region server 1517 if (LOG.isDebugEnabled()) { 1518 LOG.debug("Updating region size map after retrieving cached file list"); 1519 dumpPrefetchList(); 1520 } 1521 regionCachedSize.clear(); 1522 fullyCachedFiles.forEach((hFileName, hFileSize) -> { 1523 // Get the region name for each file 1524 String regionEncodedName = hFileSize.getFirst(); 1525 long cachedFileSize = hFileSize.getSecond(); 1526 regionCachedSize.merge(regionEncodedName, cachedFileSize, 1527 (oldpf, fileSize) -> oldpf + fileSize); 1528 }); 1529 } 1530 1531 private void dumpPrefetchList() { 1532 for (Map.Entry<String, Pair<String, Long>> outerEntry : fullyCachedFiles.entrySet()) { 1533 LOG.debug("Cached File Entry:<{},<{},{}>>", outerEntry.getKey(), 1534 outerEntry.getValue().getFirst(), outerEntry.getValue().getSecond()); 1535 } 1536 } 1537 1538 private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) 1539 throws IOException { 1540 if (capacitySize != cacheCapacity) { 1541 throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize) 1542 + ", expected: " + StringUtils.byteDesc(cacheCapacity)); 1543 } 1544 if (!ioEngine.getClass().getName().equals(ioclass)) { 1545 throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:" 1546 + ioEngine.getClass().getName()); 1547 } 1548 if (!backingMap.getClass().getName().equals(mapclass)) { 1549 throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:" 1550 + backingMap.getClass().getName()); 1551 } 1552 } 1553 1554 private void verifyFileIntegrity(BucketCacheProtos.BucketCacheEntry proto) { 1555 try { 1556 if (proto.hasChecksum()) { 1557 ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), 1558 algorithm); 1559 } 1560 backingMapValidated.set(true); 1561 } catch (IOException e) { 1562 LOG.warn("Checksum for cache file failed. " 1563 + "We need to validate each cache key in the backing map. " 1564 + "This may take some time, so we'll do it in a background thread,"); 1565 1566 Runnable cacheValidator = () -> { 1567 while (bucketAllocator == null) { 1568 try { 1569 Thread.sleep(50); 1570 } catch (InterruptedException ex) { 1571 throw new RuntimeException(ex); 1572 } 1573 } 1574 long startTime = EnvironmentEdgeManager.currentTime(); 1575 int totalKeysOriginally = backingMap.size(); 1576 for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : backingMap.entrySet()) { 1577 try { 1578 ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue()); 1579 } catch (IOException e1) { 1580 LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey()); 1581 evictBlock(keyEntry.getKey()); 1582 fileNotFullyCached(keyEntry.getKey().getHfileName()); 1583 } 1584 } 1585 backingMapValidated.set(true); 1586 LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.", 1587 totalKeysOriginally, backingMap.size(), 1588 (EnvironmentEdgeManager.currentTime() - startTime)); 1589 }; 1590 Thread t = new Thread(cacheValidator); 1591 t.setDaemon(true); 1592 t.start(); 1593 } 1594 } 1595 1596 private void updateCacheIndex(BucketCacheProtos.BackingMap chunk, 1597 java.util.Map<java.lang.Integer, java.lang.String> deserializer) throws IOException { 1598 Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair2 = 1599 BucketProtoUtils.fromPB(deserializer, chunk, this::createRecycler); 1600 backingMap.putAll(pair2.getFirst()); 1601 blocksByHFile.addAll(pair2.getSecond()); 1602 } 1603 1604 private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { 1605 Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair = 1606 BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), 1607 this::createRecycler); 1608 backingMap = pair.getFirst(); 1609 blocksByHFile = pair.getSecond(); 1610 fullyCachedFiles.clear(); 1611 fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap())); 1612 1613 LOG.info("After retrieval Backing map size: {}, fullyCachedFiles size: {}", backingMap.size(), 1614 fullyCachedFiles.size()); 1615 1616 verifyFileIntegrity(proto); 1617 updateRegionSizeMapWhileRetrievingFromFile(); 1618 verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); 1619 } 1620 1621 private void persistChunkedBackingMap(FileOutputStream fos) throws IOException { 1622 LOG.debug( 1623 "persistToFile: before persisting backing map size: {}, " 1624 + "fullycachedFiles size: {}, chunkSize: {}", 1625 backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize); 1626 1627 BucketProtoUtils.serializeAsPB(this, fos, persistenceChunkSize); 1628 1629 LOG.debug( 1630 "persistToFile: after persisting backing map size: {}, " + "fullycachedFiles size: {}", 1631 backingMap.size(), fullyCachedFiles.size()); 1632 } 1633 1634 private void retrieveChunkedBackingMap(FileInputStream in) throws IOException { 1635 1636 // Read the first chunk that has all the details. 1637 BucketCacheProtos.BucketCacheEntry cacheEntry = 1638 BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in); 1639 1640 fullyCachedFiles.clear(); 1641 fullyCachedFiles.putAll(BucketProtoUtils.fromPB(cacheEntry.getCachedFilesMap())); 1642 1643 backingMap.clear(); 1644 blocksByHFile.clear(); 1645 1646 // Read the backing map entries in batches. 1647 int numChunks = 0; 1648 while (in.available() > 0) { 1649 updateCacheIndex(BucketCacheProtos.BackingMap.parseDelimitedFrom(in), 1650 cacheEntry.getDeserializersMap()); 1651 numChunks++; 1652 } 1653 1654 LOG.info("Retrieved {} of chunks with blockCount = {}.", numChunks, backingMap.size()); 1655 verifyFileIntegrity(cacheEntry); 1656 verifyCapacityAndClasses(cacheEntry.getCacheCapacity(), cacheEntry.getIoClass(), 1657 cacheEntry.getMapClass()); 1658 updateRegionSizeMapWhileRetrievingFromFile(); 1659 } 1660 1661 /** 1662 * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors 1663 * exceeds ioErrorsDurationTimeTolerated, we will disable the cache 1664 */ 1665 private void checkIOErrorIsTolerated() { 1666 long now = EnvironmentEdgeManager.currentTime(); 1667 // Do a single read to a local variable to avoid timing issue - HBASE-24454 1668 long ioErrorStartTimeTmp = this.ioErrorStartTime; 1669 if (ioErrorStartTimeTmp > 0) { 1670 if (isCacheEnabled() && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { 1671 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration 1672 + "ms, disabling cache, please check your IOEngine"); 1673 disableCache(); 1674 } 1675 } else { 1676 this.ioErrorStartTime = now; 1677 } 1678 } 1679 1680 /** 1681 * Used to shut down the cache -or- turn it off in the case of something broken. 1682 */ 1683 private void disableCache() { 1684 if (!isCacheEnabled()) { 1685 return; 1686 } 1687 LOG.info("Disabling cache"); 1688 cacheState = CacheState.DISABLED; 1689 ioEngine.shutdown(); 1690 this.scheduleThreadPool.shutdown(); 1691 for (int i = 0; i < writerThreads.length; ++i) 1692 writerThreads[i].interrupt(); 1693 this.ramCache.clear(); 1694 if (!ioEngine.isPersistent() || persistencePath == null) { 1695 // If persistent ioengine and a path, we will serialize out the backingMap. 1696 this.backingMap.clear(); 1697 this.blocksByHFile.clear(); 1698 this.fullyCachedFiles.clear(); 1699 this.regionCachedSize.clear(); 1700 } 1701 } 1702 1703 private void join() throws InterruptedException { 1704 for (int i = 0; i < writerThreads.length; ++i) 1705 writerThreads[i].join(); 1706 } 1707 1708 @Override 1709 public void shutdown() { 1710 disableCache(); 1711 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write=" 1712 + persistencePath); 1713 if (ioEngine.isPersistent() && persistencePath != null) { 1714 try { 1715 join(); 1716 if (cachePersister != null) { 1717 LOG.info("Shutting down cache persister thread."); 1718 cachePersister.shutdown(); 1719 while (cachePersister.isAlive()) { 1720 Thread.sleep(10); 1721 } 1722 } 1723 persistToFile(); 1724 } catch (IOException ex) { 1725 LOG.error("Unable to persist data on exit: " + ex.toString(), ex); 1726 } catch (InterruptedException e) { 1727 LOG.warn("Failed to persist data on exit", e); 1728 } 1729 } 1730 } 1731 1732 /** 1733 * Needed mostly for UTs that might run in the same VM and create different BucketCache instances 1734 * on different UT methods. 1735 */ 1736 @Override 1737 protected void finalize() { 1738 if (cachePersister != null && !cachePersister.isInterrupted()) { 1739 cachePersister.interrupt(); 1740 } 1741 } 1742 1743 @Override 1744 public CacheStats getStats() { 1745 return cacheStats; 1746 } 1747 1748 public BucketAllocator getAllocator() { 1749 return this.bucketAllocator; 1750 } 1751 1752 @Override 1753 public long heapSize() { 1754 return this.heapSize.sum(); 1755 } 1756 1757 @Override 1758 public long size() { 1759 return this.realCacheSize.sum(); 1760 } 1761 1762 @Override 1763 public long getCurrentDataSize() { 1764 return size(); 1765 } 1766 1767 @Override 1768 public long getFreeSize() { 1769 if (!isCacheInitialized("BucketCache:getFreeSize")) { 1770 return 0; 1771 } 1772 return this.bucketAllocator.getFreeSize(); 1773 } 1774 1775 @Override 1776 public long getBlockCount() { 1777 return this.blockNumber.sum(); 1778 } 1779 1780 @Override 1781 public long getDataBlockCount() { 1782 return getBlockCount(); 1783 } 1784 1785 @Override 1786 public long getCurrentSize() { 1787 if (!isCacheInitialized("BucketCache::getCurrentSize")) { 1788 return 0; 1789 } 1790 return this.bucketAllocator.getUsedSize(); 1791 } 1792 1793 protected String getAlgorithm() { 1794 return algorithm; 1795 } 1796 1797 /** 1798 * Evicts all blocks for a specific HFile. 1799 * <p> 1800 * This is used for evict-on-close to remove all blocks of a specific HFile. 1801 * @return the number of blocks evicted 1802 */ 1803 @Override 1804 public int evictBlocksByHfileName(String hfileName) { 1805 return evictBlocksRangeByHfileName(hfileName, 0, Long.MAX_VALUE); 1806 } 1807 1808 @Override 1809 public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) { 1810 fileNotFullyCached(hfileName); 1811 Set<BlockCacheKey> keySet = getAllCacheKeysForFile(hfileName, initOffset, endOffset); 1812 LOG.debug("found {} blocks for file {}, starting offset: {}, end offset: {}", keySet.size(), 1813 hfileName, initOffset, endOffset); 1814 int numEvicted = 0; 1815 for (BlockCacheKey key : keySet) { 1816 if (evictBlock(key)) { 1817 ++numEvicted; 1818 } 1819 } 1820 return numEvicted; 1821 } 1822 1823 private Set<BlockCacheKey> getAllCacheKeysForFile(String hfileName, long init, long end) { 1824 return blocksByHFile.subSet(new BlockCacheKey(hfileName, init), true, 1825 new BlockCacheKey(hfileName, end), true); 1826 } 1827 1828 /** 1829 * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each 1830 * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate 1831 * number of elements out of each according to configuration parameters and their relative sizes. 1832 */ 1833 private class BucketEntryGroup { 1834 1835 private CachedEntryQueue queue; 1836 private long totalSize = 0; 1837 private long bucketSize; 1838 1839 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { 1840 this.bucketSize = bucketSize; 1841 queue = new CachedEntryQueue(bytesToFree, blockSize); 1842 totalSize = 0; 1843 } 1844 1845 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) { 1846 totalSize += block.getValue().getLength(); 1847 queue.add(block); 1848 } 1849 1850 public long free(long toFree) { 1851 Map.Entry<BlockCacheKey, BucketEntry> entry; 1852 long freedBytes = 0; 1853 // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free 1854 // What to do then? Caching attempt fail? Need some changes in cacheBlock API? 1855 while ((entry = queue.pollLast()) != null) { 1856 BlockCacheKey blockCacheKey = entry.getKey(); 1857 BucketEntry be = entry.getValue(); 1858 if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) { 1859 freedBytes += be.getLength(); 1860 } 1861 if (freedBytes >= toFree) { 1862 return freedBytes; 1863 } 1864 } 1865 return freedBytes; 1866 } 1867 1868 public long overflow() { 1869 return totalSize - bucketSize; 1870 } 1871 1872 public long totalSize() { 1873 return totalSize; 1874 } 1875 } 1876 1877 /** 1878 * Block Entry stored in the memory with key,data and so on 1879 */ 1880 static class RAMQueueEntry { 1881 private final BlockCacheKey key; 1882 private final Cacheable data; 1883 private long accessCounter; 1884 private boolean inMemory; 1885 private boolean isCachePersistent; 1886 1887 RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory, 1888 boolean isCachePersistent) { 1889 this.key = bck; 1890 this.data = data; 1891 this.accessCounter = accessCounter; 1892 this.inMemory = inMemory; 1893 this.isCachePersistent = isCachePersistent; 1894 } 1895 1896 public Cacheable getData() { 1897 return data; 1898 } 1899 1900 public BlockCacheKey getKey() { 1901 return key; 1902 } 1903 1904 public void access(long accessCounter) { 1905 this.accessCounter = accessCounter; 1906 } 1907 1908 private ByteBuffAllocator getByteBuffAllocator() { 1909 if (data instanceof HFileBlock) { 1910 return ((HFileBlock) data).getByteBuffAllocator(); 1911 } 1912 return ByteBuffAllocator.HEAP; 1913 } 1914 1915 public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, 1916 final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler, 1917 ByteBuffer metaBuff) throws IOException { 1918 int len = data.getSerializedLength(); 1919 // This cacheable thing can't be serialized 1920 if (len == 0) { 1921 return null; 1922 } 1923 if (isCachePersistent && data instanceof HFileBlock) { 1924 len += Long.BYTES; // we need to record the cache time for consistency check in case of 1925 // recovery 1926 } 1927 long offset = alloc.allocateBlock(len); 1928 boolean succ = false; 1929 BucketEntry bucketEntry = null; 1930 try { 1931 int diskSizeWithHeader = (data instanceof HFileBlock) 1932 ? ((HFileBlock) data).getOnDiskSizeWithHeader() 1933 : data.getSerializedLength(); 1934 bucketEntry = new BucketEntry(offset, len, diskSizeWithHeader, accessCounter, inMemory, 1935 createRecycler, getByteBuffAllocator()); 1936 bucketEntry.setDeserializerReference(data.getDeserializer()); 1937 if (data instanceof HFileBlock) { 1938 // If an instance of HFileBlock, save on some allocations. 1939 HFileBlock block = (HFileBlock) data; 1940 ByteBuff sliceBuf = block.getBufferReadOnly(); 1941 block.getMetaData(metaBuff); 1942 // adds the cache time prior to the block and metadata part 1943 if (isCachePersistent) { 1944 ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); 1945 buffer.putLong(bucketEntry.getCachedTime()); 1946 buffer.rewind(); 1947 ioEngine.write(buffer, offset); 1948 ioEngine.write(sliceBuf, (offset + Long.BYTES)); 1949 } else { 1950 ioEngine.write(sliceBuf, offset); 1951 } 1952 ioEngine.write(metaBuff, offset + len - metaBuff.limit()); 1953 } else { 1954 // Only used for testing. 1955 ByteBuffer bb = ByteBuffer.allocate(len); 1956 data.serialize(bb, true); 1957 ioEngine.write(bb, offset); 1958 } 1959 succ = true; 1960 } finally { 1961 if (!succ) { 1962 alloc.freeBlock(offset, len); 1963 } 1964 } 1965 realCacheSize.add(len); 1966 return bucketEntry; 1967 } 1968 } 1969 1970 /** 1971 * Only used in test 1972 */ 1973 void stopWriterThreads() throws InterruptedException { 1974 for (WriterThread writerThread : writerThreads) { 1975 writerThread.disableWriter(); 1976 writerThread.interrupt(); 1977 writerThread.join(); 1978 } 1979 } 1980 1981 @Override 1982 public Iterator<CachedBlock> iterator() { 1983 // Don't bother with ramcache since stuff is in here only a little while. 1984 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator(); 1985 return new Iterator<CachedBlock>() { 1986 private final long now = System.nanoTime(); 1987 1988 @Override 1989 public boolean hasNext() { 1990 return i.hasNext(); 1991 } 1992 1993 @Override 1994 public CachedBlock next() { 1995 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next(); 1996 return new CachedBlock() { 1997 @Override 1998 public String toString() { 1999 return BlockCacheUtil.toString(this, now); 2000 } 2001 2002 @Override 2003 public BlockPriority getBlockPriority() { 2004 return e.getValue().getPriority(); 2005 } 2006 2007 @Override 2008 public BlockType getBlockType() { 2009 // Not held by BucketEntry. Could add it if wanted on BucketEntry creation. 2010 return null; 2011 } 2012 2013 @Override 2014 public long getOffset() { 2015 return e.getKey().getOffset(); 2016 } 2017 2018 @Override 2019 public long getSize() { 2020 return e.getValue().getLength(); 2021 } 2022 2023 @Override 2024 public long getCachedTime() { 2025 return e.getValue().getCachedTime(); 2026 } 2027 2028 @Override 2029 public String getFilename() { 2030 return e.getKey().getHfileName(); 2031 } 2032 2033 @Override 2034 public int compareTo(CachedBlock other) { 2035 int diff = this.getFilename().compareTo(other.getFilename()); 2036 if (diff != 0) return diff; 2037 2038 diff = Long.compare(this.getOffset(), other.getOffset()); 2039 if (diff != 0) return diff; 2040 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 2041 throw new IllegalStateException( 2042 "" + this.getCachedTime() + ", " + other.getCachedTime()); 2043 } 2044 return Long.compare(other.getCachedTime(), this.getCachedTime()); 2045 } 2046 2047 @Override 2048 public int hashCode() { 2049 return e.getKey().hashCode(); 2050 } 2051 2052 @Override 2053 public boolean equals(Object obj) { 2054 if (obj instanceof CachedBlock) { 2055 CachedBlock cb = (CachedBlock) obj; 2056 return compareTo(cb) == 0; 2057 } else { 2058 return false; 2059 } 2060 } 2061 }; 2062 } 2063 2064 @Override 2065 public void remove() { 2066 throw new UnsupportedOperationException(); 2067 } 2068 }; 2069 } 2070 2071 @Override 2072 public BlockCache[] getBlockCaches() { 2073 return null; 2074 } 2075 2076 public int getRpcRefCount(BlockCacheKey cacheKey) { 2077 BucketEntry bucketEntry = backingMap.get(cacheKey); 2078 if (bucketEntry != null) { 2079 return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1); 2080 } 2081 return 0; 2082 } 2083 2084 float getAcceptableFactor() { 2085 return acceptableFactor; 2086 } 2087 2088 float getMinFactor() { 2089 return minFactor; 2090 } 2091 2092 float getExtraFreeFactor() { 2093 return extraFreeFactor; 2094 } 2095 2096 float getSingleFactor() { 2097 return singleFactor; 2098 } 2099 2100 float getMultiFactor() { 2101 return multiFactor; 2102 } 2103 2104 float getMemoryFactor() { 2105 return memoryFactor; 2106 } 2107 2108 public String getPersistencePath() { 2109 return persistencePath; 2110 } 2111 2112 /** 2113 * Wrapped the delegate ConcurrentMap with maintaining its block's reference count. 2114 */ 2115 static class RAMCache { 2116 /** 2117 * Defined the map as {@link ConcurrentHashMap} explicitly here, because in 2118 * {@link RAMCache#get(BlockCacheKey)} and 2119 * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee 2120 * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the 2121 * func method can execute exactly once only when the key is present(or absent) and under the 2122 * lock context. Otherwise, the reference count of block will be messed up. Notice that the 2123 * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 2124 */ 2125 final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>(); 2126 2127 public boolean containsKey(BlockCacheKey key) { 2128 return delegate.containsKey(key); 2129 } 2130 2131 public RAMQueueEntry get(BlockCacheKey key) { 2132 return delegate.computeIfPresent(key, (k, re) -> { 2133 // It'll be referenced by RPC, so retain atomically here. if the get and retain is not 2134 // atomic, another thread may remove and release the block, when retaining in this thread we 2135 // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422) 2136 re.getData().retain(); 2137 return re; 2138 }); 2139 } 2140 2141 /** 2142 * Return the previous associated value, or null if absent. It has the same meaning as 2143 * {@link ConcurrentMap#putIfAbsent(Object, Object)} 2144 */ 2145 public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) { 2146 AtomicBoolean absent = new AtomicBoolean(false); 2147 RAMQueueEntry re = delegate.computeIfAbsent(key, k -> { 2148 // The RAMCache reference to this entry, so reference count should be increment. 2149 entry.getData().retain(); 2150 absent.set(true); 2151 return entry; 2152 }); 2153 return absent.get() ? null : re; 2154 } 2155 2156 public boolean remove(BlockCacheKey key) { 2157 return remove(key, re -> { 2158 }); 2159 } 2160 2161 /** 2162 * Defined an {@link Consumer} here, because once the removed entry release its reference count, 2163 * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an 2164 * exception. the consumer will access entry to remove before release its reference count. 2165 * Notice, don't change its reference count in the {@link Consumer} 2166 */ 2167 public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) { 2168 RAMQueueEntry previous = delegate.remove(key); 2169 action.accept(previous); 2170 if (previous != null) { 2171 previous.getData().release(); 2172 } 2173 return previous != null; 2174 } 2175 2176 public boolean isEmpty() { 2177 return delegate.isEmpty(); 2178 } 2179 2180 public void clear() { 2181 Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator(); 2182 while (it.hasNext()) { 2183 RAMQueueEntry re = it.next().getValue(); 2184 it.remove(); 2185 re.getData().release(); 2186 } 2187 } 2188 2189 public boolean hasBlocksForFile(String fileName) { 2190 return delegate.keySet().stream().filter(key -> key.getHfileName().equals(fileName)) 2191 .findFirst().isPresent(); 2192 } 2193 } 2194 2195 public Map<BlockCacheKey, BucketEntry> getBackingMap() { 2196 return backingMap; 2197 } 2198 2199 public AtomicBoolean getBackingMapValidated() { 2200 return backingMapValidated; 2201 } 2202 2203 @Override 2204 public Optional<Map<String, Pair<String, Long>>> getFullyCachedFiles() { 2205 return Optional.of(fullyCachedFiles); 2206 } 2207 2208 public static Optional<BucketCache> getBucketCacheFromCacheConfig(CacheConfig cacheConf) { 2209 if (cacheConf.getBlockCache().isPresent()) { 2210 BlockCache bc = cacheConf.getBlockCache().get(); 2211 if (bc instanceof CombinedBlockCache) { 2212 BlockCache l2 = ((CombinedBlockCache) bc).getSecondLevelCache(); 2213 if (l2 instanceof BucketCache) { 2214 return Optional.of((BucketCache) l2); 2215 } 2216 } else if (bc instanceof BucketCache) { 2217 return Optional.of((BucketCache) bc); 2218 } 2219 } 2220 return Optional.empty(); 2221 } 2222 2223 @Override 2224 public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int dataBlockCount, 2225 long size) { 2226 // block eviction may be happening in the background as prefetch runs, 2227 // so we need to count all blocks for this file in the backing map under 2228 // a read lock for the block offset 2229 final List<ReentrantReadWriteLock> locks = new ArrayList<>(); 2230 LOG.debug("Notifying caching completed for file {}, with total blocks {}, and data blocks {}", 2231 fileName, totalBlockCount, dataBlockCount); 2232 try { 2233 final MutableInt count = new MutableInt(); 2234 LOG.debug("iterating over {} entries in the backing map", backingMap.size()); 2235 Set<BlockCacheKey> result = getAllCacheKeysForFile(fileName.getName(), 0, Long.MAX_VALUE); 2236 if (result.isEmpty() && StoreFileInfo.isReference(fileName)) { 2237 result = getAllCacheKeysForFile( 2238 StoreFileInfo.getReferredToRegionAndFile(fileName.getName()).getSecond(), 0, 2239 Long.MAX_VALUE); 2240 } 2241 result.stream().forEach(entry -> { 2242 LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}", 2243 fileName.getName(), entry.getOffset()); 2244 ReentrantReadWriteLock lock = offsetLock.getLock(entry.getOffset()); 2245 lock.readLock().lock(); 2246 locks.add(lock); 2247 if (backingMap.containsKey(entry) && entry.getBlockType() == BlockType.DATA) { 2248 count.increment(); 2249 } 2250 }); 2251 int metaCount = totalBlockCount - dataBlockCount; 2252 // BucketCache would only have data blocks 2253 if (dataBlockCount == count.getValue()) { 2254 LOG.debug("File {} has now been fully cached.", fileName); 2255 fileCacheCompleted(fileName, size); 2256 } else { 2257 LOG.debug( 2258 "Prefetch executor completed for {}, but only {} data blocks were cached. " 2259 + "Total data blocks for file: {}. " 2260 + "Checking for blocks pending cache in cache writer queue.", 2261 fileName, count.getValue(), dataBlockCount); 2262 if (ramCache.hasBlocksForFile(fileName.getName())) { 2263 for (ReentrantReadWriteLock lock : locks) { 2264 lock.readLock().unlock(); 2265 } 2266 locks.clear(); 2267 LOG.debug("There are still blocks pending caching for file {}. Will sleep 100ms " 2268 + "and try the verification again.", fileName); 2269 Thread.sleep(100); 2270 notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size); 2271 } else if ( 2272 (getAllCacheKeysForFile(fileName.getName(), 0, Long.MAX_VALUE).size() - metaCount) 2273 == dataBlockCount 2274 ) { 2275 LOG.debug("We counted {} data blocks, expected was {}, there was no more pending in " 2276 + "the cache write queue but we now found that total cached blocks for file {} " 2277 + "is equal to data block count.", count, dataBlockCount, fileName.getName()); 2278 fileCacheCompleted(fileName, size); 2279 } else { 2280 LOG.info("We found only {} data blocks cached from a total of {} for file {}, " 2281 + "but no blocks pending caching. Maybe cache is full or evictions " 2282 + "happened concurrently to cache prefetch.", count, dataBlockCount, fileName); 2283 } 2284 } 2285 } catch (InterruptedException e) { 2286 throw new RuntimeException(e); 2287 } finally { 2288 for (ReentrantReadWriteLock lock : locks) { 2289 lock.readLock().unlock(); 2290 } 2291 } 2292 } 2293 2294 @Override 2295 public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) { 2296 if (!isCacheInitialized("blockFitsIntoTheCache")) { 2297 return Optional.of(false); 2298 } 2299 2300 long currentUsed = bucketAllocator.getUsedSize(); 2301 boolean result = (currentUsed + block.getOnDiskSizeWithHeader()) < acceptableSize(); 2302 return Optional.of(result); 2303 } 2304 2305 @Override 2306 public Optional<Boolean> shouldCacheFile(String fileName) { 2307 // if we don't have the file in fullyCachedFiles, we should cache it 2308 return Optional.of(!fullyCachedFiles.containsKey(fileName)); 2309 } 2310 2311 @Override 2312 public Optional<Boolean> isAlreadyCached(BlockCacheKey key) { 2313 boolean foundKey = backingMap.containsKey(key); 2314 // if there's no entry for the key itself, we need to check if this key is for a reference, 2315 // and if so, look for a block from the referenced file using this getBlockForReference method. 2316 return Optional.of(foundKey ? true : getBlockForReference(key) != null); 2317 } 2318 2319 @Override 2320 public Optional<Integer> getBlockSize(BlockCacheKey key) { 2321 BucketEntry entry = backingMap.get(key); 2322 if (entry == null) { 2323 // the key might be for a reference tha we had found the block from the referenced file in 2324 // the cache when we first tried to cache it. 2325 entry = getBlockForReference(key); 2326 return entry == null ? Optional.empty() : Optional.of(entry.getOnDiskSizeWithHeader()); 2327 } else { 2328 return Optional.of(entry.getOnDiskSizeWithHeader()); 2329 } 2330 2331 } 2332 2333 boolean isCacheInitialized(String api) { 2334 if (cacheState == CacheState.INITIALIZING) { 2335 LOG.warn("Bucket initialisation pending at {}", api); 2336 return false; 2337 } 2338 return true; 2339 } 2340 2341 @Override 2342 public boolean waitForCacheInitialization(long timeout) { 2343 try { 2344 while (cacheState == CacheState.INITIALIZING) { 2345 if (timeout <= 0) { 2346 break; 2347 } 2348 Thread.sleep(100); 2349 timeout -= 100; 2350 } 2351 } finally { 2352 return isCacheEnabled(); 2353 } 2354 } 2355}