001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021package org.apache.hadoop.hbase.io.hfile.bucket; 022 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.FileOutputStream; 026import java.io.IOException; 027import java.nio.ByteBuffer; 028import java.util.ArrayList; 029import java.util.Comparator; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.NavigableSet; 035import java.util.PriorityQueue; 036import java.util.Set; 037import java.util.concurrent.ArrayBlockingQueue; 038import java.util.concurrent.BlockingQueue; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ConcurrentMap; 041import java.util.concurrent.ConcurrentSkipListSet; 042import java.util.concurrent.Executors; 043import java.util.concurrent.ScheduledExecutorService; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.atomic.AtomicBoolean; 046import java.util.concurrent.atomic.AtomicLong; 047import java.util.concurrent.atomic.LongAdder; 048import java.util.concurrent.locks.Lock; 049import java.util.concurrent.locks.ReentrantLock; 050import java.util.concurrent.locks.ReentrantReadWriteLock; 051import java.util.function.Consumer; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.TableName; 055import org.apache.hadoop.hbase.client.Admin; 056import org.apache.hadoop.hbase.io.ByteBuffAllocator; 057import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 058import org.apache.hadoop.hbase.io.HeapSize; 059import org.apache.hadoop.hbase.io.hfile.BlockCache; 060import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 061import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 062import org.apache.hadoop.hbase.io.hfile.BlockPriority; 063import org.apache.hadoop.hbase.io.hfile.BlockType; 064import org.apache.hadoop.hbase.io.hfile.CacheStats; 065import org.apache.hadoop.hbase.io.hfile.Cacheable; 066import org.apache.hadoop.hbase.io.hfile.CachedBlock; 067import org.apache.hadoop.hbase.io.hfile.HFileBlock; 068import org.apache.hadoop.hbase.nio.ByteBuff; 069import org.apache.hadoop.hbase.nio.RefCnt; 070import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 072import org.apache.hadoop.hbase.util.IdReadWriteLock; 073import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType; 074import org.apache.hadoop.util.StringUtils; 075import org.apache.yetus.audience.InterfaceAudience; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 080import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 081 082import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; 083 084/** 085 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses 086 * BucketCache#ramCache and BucketCache#backingMap in order to 087 * determine if a given element is in the cache. The bucket cache can use on-heap or 088 * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to 089 * store/read the block data. 090 * 091 * <p>Eviction is via a similar algorithm as used in 092 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} 093 * 094 * <p>BucketCache can be used as mainly a block cache (see 095 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with 096 * a BlockCache to decrease CMS GC and heap fragmentation. 097 * 098 * <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store 099 * blocks) to enlarge cache space via a victim cache. 100 */ 101@InterfaceAudience.Private 102public class BucketCache implements BlockCache, HeapSize { 103 private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class); 104 105 /** Priority buckets config */ 106 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; 107 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; 108 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; 109 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; 110 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; 111 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; 112 113 /** Priority buckets */ 114 static final float DEFAULT_SINGLE_FACTOR = 0.25f; 115 static final float DEFAULT_MULTI_FACTOR = 0.50f; 116 static final float DEFAULT_MEMORY_FACTOR = 0.25f; 117 static final float DEFAULT_MIN_FACTOR = 0.85f; 118 119 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; 120 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 121 122 // Number of blocks to clear for each of the bucket size that is full 123 private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; 124 125 /** Statistics thread */ 126 private static final int statThreadPeriod = 5 * 60; 127 128 final static int DEFAULT_WRITER_THREADS = 3; 129 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; 130 131 // Store/read block data 132 transient final IOEngine ioEngine; 133 134 // Store the block in this map before writing it to cache 135 transient final RAMCache ramCache; 136 // In this map, store the block's meta data like offset, length 137 transient ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap; 138 139 /** 140 * Flag if the cache is enabled or not... We shut it off if there are IO 141 * errors for some time, so that Bucket IO exceptions/errors don't bring down 142 * the HBase server. 143 */ 144 private volatile boolean cacheEnabled; 145 146 /** 147 * A list of writer queues. We have a queue per {@link WriterThread} we have running. 148 * In other words, the work adding blocks to the BucketCache is divided up amongst the 149 * running WriterThreads. Its done by taking hash of the cache key modulo queue count. 150 * WriterThread when it runs takes whatever has been recently added and 'drains' the entries 151 * to the BucketCache. It then updates the ramCache and backingMap accordingly. 152 */ 153 transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); 154 transient final WriterThread[] writerThreads; 155 156 /** Volatile boolean to track if free space is in process or not */ 157 private volatile boolean freeInProgress = false; 158 private transient final Lock freeSpaceLock = new ReentrantLock(); 159 160 private final LongAdder realCacheSize = new LongAdder(); 161 private final LongAdder heapSize = new LongAdder(); 162 /** Current number of cached elements */ 163 private final LongAdder blockNumber = new LongAdder(); 164 165 /** Cache access count (sequential ID) */ 166 private final AtomicLong accessCount = new AtomicLong(); 167 168 private static final int DEFAULT_CACHE_WAIT_TIME = 50; 169 170 /** 171 * Used in tests. If this flag is false and the cache speed is very fast, 172 * bucket cache will skip some blocks when caching. If the flag is true, we 173 * will wait until blocks are flushed to IOEngine. 174 */ 175 boolean wait_when_cache = false; 176 177 private final BucketCacheStats cacheStats = new BucketCacheStats(); 178 179 private final String persistencePath; 180 private final long cacheCapacity; 181 /** Approximate block size */ 182 private final long blockSize; 183 184 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ 185 private final int ioErrorsTolerationDuration; 186 // 1 min 187 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; 188 189 // Start time of first IO error when reading or writing IO Engine, it will be 190 // reset after a successful read/write. 191 private volatile long ioErrorStartTime = -1; 192 193 /** 194 * A ReentrantReadWriteLock to lock on a particular block identified by offset. 195 * The purpose of this is to avoid freeing the block which is being read. 196 * <p> 197 * Key set of offsets in BucketCache is limited so soft reference is the best choice here. 198 */ 199 transient final IdReadWriteLock<Long> offsetLock = new IdReadWriteLock<>(ReferenceType.SOFT); 200 201 private final NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>((a, b) -> { 202 int nameComparison = a.getHfileName().compareTo(b.getHfileName()); 203 if (nameComparison != 0) { 204 return nameComparison; 205 } 206 return Long.compare(a.getOffset(), b.getOffset()); 207 }); 208 209 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 210 private transient final ScheduledExecutorService scheduleThreadPool = 211 Executors.newScheduledThreadPool(1, 212 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); 213 214 // Allocate or free space for the block 215 private transient BucketAllocator bucketAllocator; 216 217 /** Acceptable size of cache (no evictions if size < acceptable) */ 218 private float acceptableFactor; 219 220 /** Minimum threshold of cache (when evicting, evict until size < min) */ 221 private float minFactor; 222 223 /** Free this floating point factor of extra blocks when evicting. For example free the number of blocks requested * (1 + extraFreeFactor) */ 224 private float extraFreeFactor; 225 226 /** Single access bucket size */ 227 private float singleFactor; 228 229 /** Multiple access bucket size */ 230 private float multiFactor; 231 232 /** In-memory bucket size */ 233 private float memoryFactor; 234 235 private static final String FILE_VERIFY_ALGORITHM = 236 "hbase.bucketcache.persistent.file.integrity.check.algorithm"; 237 private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; 238 239 /** 240 * Use {@link java.security.MessageDigest} class's encryption algorithms to check 241 * persistent file integrity, default algorithm is MD5 242 * */ 243 private String algorithm; 244 245 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 246 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 247 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 248 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); 249 } 250 251 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 252 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 253 Configuration conf) throws IOException { 254 this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); 255 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); 256 this.writerThreads = new WriterThread[writerThreadNum]; 257 long blockNumCapacity = capacity / blockSize; 258 if (blockNumCapacity >= Integer.MAX_VALUE) { 259 // Enough for about 32TB of cache! 260 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); 261 } 262 263 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); 264 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); 265 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); 266 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); 267 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); 268 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); 269 270 sanityCheckConfigs(); 271 272 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor + ", minFactor: " + minFactor + 273 ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " + singleFactor + ", multiFactor: " + multiFactor + 274 ", memoryFactor: " + memoryFactor); 275 276 this.cacheCapacity = capacity; 277 this.persistencePath = persistencePath; 278 this.blockSize = blockSize; 279 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; 280 281 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 282 for (int i = 0; i < writerThreads.length; ++i) { 283 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); 284 } 285 286 assert writerQueues.size() == writerThreads.length; 287 this.ramCache = new RAMCache(); 288 289 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); 290 291 if (ioEngine.isPersistent() && persistencePath != null) { 292 try { 293 retrieveFromFile(bucketSizes); 294 } catch (IOException ioex) { 295 LOG.error("Can't restore from file[" + persistencePath + "] because of ", ioex); 296 } 297 } 298 final String threadName = Thread.currentThread().getName(); 299 this.cacheEnabled = true; 300 for (int i = 0; i < writerThreads.length; ++i) { 301 writerThreads[i] = new WriterThread(writerQueues.get(i)); 302 writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); 303 writerThreads[i].setDaemon(true); 304 } 305 startWriterThreads(); 306 307 // Run the statistics thread periodically to print the cache statistics log 308 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 309 // every five minutes. 310 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), 311 statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); 312 LOG.info("Started bucket cache; ioengine=" + ioEngineName + 313 ", capacity=" + StringUtils.byteDesc(capacity) + 314 ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" + 315 writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" + 316 persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); 317 } 318 319 private void sanityCheckConfigs() { 320 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 321 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 322 Preconditions.checkArgument(minFactor <= acceptableFactor, MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); 323 Preconditions.checkArgument(extraFreeFactor >= 0, EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); 324 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 325 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 326 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 327 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, SINGLE_FACTOR_CONFIG_NAME + ", " + 328 MULTI_FACTOR_CONFIG_NAME + ", and " + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); 329 } 330 331 /** 332 * Called by the constructor to start the writer threads. Used by tests that need to override 333 * starting the threads. 334 */ 335 protected void startWriterThreads() { 336 for (WriterThread thread : writerThreads) { 337 thread.start(); 338 } 339 } 340 341 boolean isCacheEnabled() { 342 return this.cacheEnabled; 343 } 344 345 @Override 346 public long getMaxSize() { 347 return this.cacheCapacity; 348 } 349 350 public String getIoEngine() { 351 return ioEngine.toString(); 352 } 353 354 /** 355 * Get the IOEngine from the IO engine name 356 * @param ioEngineName 357 * @param capacity 358 * @param persistencePath 359 * @return the IOEngine 360 * @throws IOException 361 */ 362 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) 363 throws IOException { 364 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { 365 // In order to make the usage simple, we only need the prefix 'files:' in 366 // document whether one or multiple file(s), but also support 'file:' for 367 // the compatibility 368 String[] filePaths = ioEngineName.substring(ioEngineName.indexOf(":") + 1) 369 .split(FileIOEngine.FILE_DELIMITER); 370 return new FileIOEngine(capacity, persistencePath != null, filePaths); 371 } else if (ioEngineName.startsWith("offheap")) { 372 return new ByteBufferIOEngine(capacity); 373 } else if (ioEngineName.startsWith("mmap:")) { 374 return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 375 } else if (ioEngineName.startsWith("pmem:")) { 376 // This mode of bucket cache creates an IOEngine over a file on the persistent memory 377 // device. Since the persistent memory device has its own address space the contents 378 // mapped to this address space does not get swapped out like in the case of mmapping 379 // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache 380 // can be directly referred to without having to copy them onheap. Once the RPC is done, 381 // the blocks can be returned back as in case of ByteBufferIOEngine. 382 return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 383 } else { 384 throw new IllegalArgumentException( 385 "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); 386 } 387 } 388 389 /** 390 * Cache the block with the specified name and buffer. 391 * @param cacheKey block's cache key 392 * @param buf block buffer 393 */ 394 @Override 395 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 396 cacheBlock(cacheKey, buf, false); 397 } 398 399 /** 400 * Cache the block with the specified name and buffer. 401 * @param cacheKey block's cache key 402 * @param cachedItem block buffer 403 * @param inMemory if block is in-memory 404 */ 405 @Override 406 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { 407 cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache); 408 } 409 410 /** 411 * Cache the block to ramCache 412 * @param cacheKey block's cache key 413 * @param cachedItem block buffer 414 * @param inMemory if block is in-memory 415 * @param wait if true, blocking wait when queue is full 416 */ 417 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 418 boolean wait) { 419 if (cacheEnabled) { 420 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { 421 if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) { 422 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 423 } 424 } else { 425 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 426 } 427 } 428 } 429 430 private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 431 boolean inMemory, boolean wait) { 432 if (!cacheEnabled) { 433 return; 434 } 435 LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); 436 // Stuff the entry into the RAM cache so it can get drained to the persistent store 437 RAMQueueEntry re = 438 new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory, 439 createRecycler(cacheKey)); 440 /** 441 * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same 442 * key in ramCache, the heap size of bucket cache need to update if replacing entry from 443 * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if 444 * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct 445 * one, then the heap size will mess up (HBASE-20789) 446 */ 447 if (ramCache.putIfAbsent(cacheKey, re) != null) { 448 return; 449 } 450 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); 451 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); 452 boolean successfulAddition = false; 453 if (wait) { 454 try { 455 successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS); 456 } catch (InterruptedException e) { 457 Thread.currentThread().interrupt(); 458 } 459 } else { 460 successfulAddition = bq.offer(re); 461 } 462 if (!successfulAddition) { 463 ramCache.remove(cacheKey); 464 cacheStats.failInsert(); 465 } else { 466 this.blockNumber.increment(); 467 this.heapSize.add(cachedItem.heapSize()); 468 blocksByHFile.add(cacheKey); 469 } 470 } 471 472 /** 473 * Get the buffer of the block with the specified key. 474 * @param key block's cache key 475 * @param caching true if the caller caches blocks on cache misses 476 * @param repeat Whether this is a repeat lookup for the same block 477 * @param updateCacheMetrics Whether we should update cache metrics or not 478 * @return buffer of specified cache key, or null if not in cache 479 */ 480 @Override 481 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 482 boolean updateCacheMetrics) { 483 if (!cacheEnabled) { 484 return null; 485 } 486 RAMQueueEntry re = ramCache.get(key); 487 if (re != null) { 488 if (updateCacheMetrics) { 489 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 490 } 491 re.access(accessCount.incrementAndGet()); 492 return re.getData(); 493 } 494 BucketEntry bucketEntry = backingMap.get(key); 495 if (bucketEntry != null) { 496 long start = System.nanoTime(); 497 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 498 try { 499 lock.readLock().lock(); 500 // We can not read here even if backingMap does contain the given key because its offset 501 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check 502 // existence here. 503 if (bucketEntry.equals(backingMap.get(key))) { 504 // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the 505 // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to 506 // the same BucketEntry, then all of the three will share the same refCnt. 507 Cacheable cachedBlock = ioEngine.read(bucketEntry); 508 if (ioEngine.usesSharedMemory()) { 509 // If IOEngine use shared memory, cachedBlock and BucketEntry will share the 510 // same RefCnt, do retain here, in order to count the number of RPC references 511 cachedBlock.retain(); 512 } 513 // Update the cache statistics. 514 if (updateCacheMetrics) { 515 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 516 cacheStats.ioHit(System.nanoTime() - start); 517 } 518 bucketEntry.access(accessCount.incrementAndGet()); 519 if (this.ioErrorStartTime > 0) { 520 ioErrorStartTime = -1; 521 } 522 return cachedBlock; 523 } 524 } catch (IOException ioex) { 525 LOG.error("Failed reading block " + key + " from bucket cache", ioex); 526 checkIOErrorIsTolerated(); 527 } finally { 528 lock.readLock().unlock(); 529 } 530 } 531 if (!repeat && updateCacheMetrics) { 532 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 533 } 534 return null; 535 } 536 537 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) { 538 bucketAllocator.freeBlock(bucketEntry.offset()); 539 realCacheSize.add(-1 * bucketEntry.getLength()); 540 blocksByHFile.remove(cacheKey); 541 if (decrementBlockNumber) { 542 this.blockNumber.decrement(); 543 } 544 } 545 546 /** 547 * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br> 548 * 1. Close an HFile, and clear all cached blocks. <br> 549 * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br> 550 * <p> 551 * Firstly, we'll try to remove the block from RAMCache. If it doesn't exist in RAMCache, then try 552 * to evict from backingMap. Here we only need to free the reference from bucket cache by calling 553 * {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this block, block can 554 * only be de-allocated when all of them release the block. 555 * <p> 556 * NOTICE: we need to grab the write offset lock firstly before releasing the reference from 557 * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when 558 * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak. 559 * @param cacheKey Block to evict 560 * @return true to indicate whether we've evicted successfully or not. 561 */ 562 @Override 563 public boolean evictBlock(BlockCacheKey cacheKey) { 564 if (!cacheEnabled) { 565 return false; 566 } 567 boolean existed = removeFromRamCache(cacheKey); 568 BucketEntry be = backingMap.get(cacheKey); 569 if (be == null) { 570 if (existed) { 571 cacheStats.evicted(0, cacheKey.isPrimary()); 572 } 573 return existed; 574 } else { 575 return be.withWriteLock(offsetLock, be::markAsEvicted); 576 } 577 } 578 579 private Recycler createRecycler(BlockCacheKey cacheKey) { 580 return () -> { 581 if (!cacheEnabled) { 582 return; 583 } 584 boolean existed = removeFromRamCache(cacheKey); 585 BucketEntry be = backingMap.get(cacheKey); 586 if (be == null && existed) { 587 cacheStats.evicted(0, cacheKey.isPrimary()); 588 } else if (be != null) { 589 be.withWriteLock(offsetLock, () -> { 590 if (backingMap.remove(cacheKey, be)) { 591 blockEvicted(cacheKey, be, !existed); 592 cacheStats.evicted(be.getCachedTime(), cacheKey.isPrimary()); 593 } 594 return null; 595 }); 596 } 597 }; 598 } 599 600 private boolean removeFromRamCache(BlockCacheKey cacheKey) { 601 return ramCache.remove(cacheKey, re -> { 602 if (re != null) { 603 this.blockNumber.decrement(); 604 this.heapSize.add(-1 * re.getData().heapSize()); 605 } 606 }); 607 } 608 609 /* 610 * Statistics thread. Periodically output cache statistics to the log. 611 */ 612 private static class StatisticsThread extends Thread { 613 private final BucketCache bucketCache; 614 615 public StatisticsThread(BucketCache bucketCache) { 616 super("BucketCacheStatsThread"); 617 setDaemon(true); 618 this.bucketCache = bucketCache; 619 } 620 621 @Override 622 public void run() { 623 bucketCache.logStats(); 624 } 625 } 626 627 public void logStats() { 628 long totalSize = bucketAllocator.getTotalSize(); 629 long usedSize = bucketAllocator.getUsedSize(); 630 long freeSize = totalSize - usedSize; 631 long cacheSize = getRealCacheSize(); 632 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + 633 "totalSize=" + StringUtils.byteDesc(totalSize) + ", " + 634 "freeSize=" + StringUtils.byteDesc(freeSize) + ", " + 635 "usedSize=" + StringUtils.byteDesc(usedSize) +", " + 636 "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " + 637 "accesses=" + cacheStats.getRequestCount() + ", " + 638 "hits=" + cacheStats.getHitCount() + ", " + 639 "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " + 640 "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " + 641 "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : 642 (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) + 643 "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + 644 "cachingHits=" + cacheStats.getHitCachingCount() + ", " + 645 "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : 646 (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) + 647 "evictions=" + cacheStats.getEvictionCount() + ", " + 648 "evicted=" + cacheStats.getEvictedCount() + ", " + 649 "evictedPerRun=" + cacheStats.evictedPerEviction()); 650 cacheStats.reset(); 651 } 652 653 public long getRealCacheSize() { 654 return this.realCacheSize.sum(); 655 } 656 657 public long acceptableSize() { 658 return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); 659 } 660 661 long getPartitionSize(float partitionFactor) { 662 return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); 663 } 664 665 /** 666 * Return the count of bucketSizeinfos still need free space 667 */ 668 private int bucketSizesAboveThresholdCount(float minFactor) { 669 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 670 int fullCount = 0; 671 for (int i = 0; i < stats.length; i++) { 672 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 673 freeGoal = Math.max(freeGoal, 1); 674 if (stats[i].freeCount() < freeGoal) { 675 fullCount++; 676 } 677 } 678 return fullCount; 679 } 680 681 /** 682 * This method will find the buckets that are minimally occupied 683 * and are not reference counted and will free them completely 684 * without any constraint on the access times of the elements, 685 * and as a process will completely free at most the number of buckets 686 * passed, sometimes it might not due to changing refCounts 687 * 688 * @param completelyFreeBucketsNeeded number of buckets to free 689 **/ 690 private void freeEntireBuckets(int completelyFreeBucketsNeeded) { 691 if (completelyFreeBucketsNeeded != 0) { 692 // First we will build a set where the offsets are reference counted, usually 693 // this set is small around O(Handler Count) unless something else is wrong 694 Set<Integer> inUseBuckets = new HashSet<>(); 695 backingMap.forEach((k, be) -> { 696 if (be.isRpcRef()) { 697 inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset())); 698 } 699 }); 700 Set<Integer> candidateBuckets = 701 bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded); 702 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) { 703 if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) { 704 entry.getValue().withWriteLock(offsetLock, entry.getValue()::markStaleAsEvicted); 705 } 706 } 707 } 708 } 709 710 /** 711 * Free the space if the used size reaches acceptableSize() or one size block 712 * couldn't be allocated. When freeing the space, we use the LRU algorithm and 713 * ensure there must be some blocks evicted 714 * @param why Why we are being called 715 */ 716 private void freeSpace(final String why) { 717 // Ensure only one freeSpace progress at a time 718 if (!freeSpaceLock.tryLock()) { 719 return; 720 } 721 try { 722 freeInProgress = true; 723 long bytesToFreeWithoutExtra = 0; 724 // Calculate free byte for each bucketSizeinfo 725 StringBuilder msgBuffer = LOG.isDebugEnabled()? new StringBuilder(): null; 726 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 727 long[] bytesToFreeForBucket = new long[stats.length]; 728 for (int i = 0; i < stats.length; i++) { 729 bytesToFreeForBucket[i] = 0; 730 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 731 freeGoal = Math.max(freeGoal, 1); 732 if (stats[i].freeCount() < freeGoal) { 733 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); 734 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; 735 if (msgBuffer != null) { 736 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" 737 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); 738 } 739 } 740 } 741 if (msgBuffer != null) { 742 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); 743 } 744 745 if (bytesToFreeWithoutExtra <= 0) { 746 return; 747 } 748 long currentSize = bucketAllocator.getUsedSize(); 749 long totalSize = bucketAllocator.getTotalSize(); 750 if (LOG.isDebugEnabled() && msgBuffer != null) { 751 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() + 752 " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" + 753 StringUtils.byteDesc(realCacheSize.sum()) + ", total=" + StringUtils.byteDesc(totalSize)); 754 } 755 756 long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra 757 * (1 + extraFreeFactor)); 758 759 // Instantiate priority buckets 760 BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra, 761 blockSize, getPartitionSize(singleFactor)); 762 BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra, 763 blockSize, getPartitionSize(multiFactor)); 764 BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra, 765 blockSize, getPartitionSize(memoryFactor)); 766 767 // Scan entire map putting bucket entry into appropriate bucket entry 768 // group 769 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { 770 switch (bucketEntryWithKey.getValue().getPriority()) { 771 case SINGLE: { 772 bucketSingle.add(bucketEntryWithKey); 773 break; 774 } 775 case MULTI: { 776 bucketMulti.add(bucketEntryWithKey); 777 break; 778 } 779 case MEMORY: { 780 bucketMemory.add(bucketEntryWithKey); 781 break; 782 } 783 } 784 } 785 786 PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<>(3, 787 Comparator.comparingLong(BucketEntryGroup::overflow)); 788 789 bucketQueue.add(bucketSingle); 790 bucketQueue.add(bucketMulti); 791 bucketQueue.add(bucketMemory); 792 793 int remainingBuckets = bucketQueue.size(); 794 long bytesFreed = 0; 795 796 BucketEntryGroup bucketGroup; 797 while ((bucketGroup = bucketQueue.poll()) != null) { 798 long overflow = bucketGroup.overflow(); 799 if (overflow > 0) { 800 long bucketBytesToFree = Math.min(overflow, 801 (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); 802 bytesFreed += bucketGroup.free(bucketBytesToFree); 803 } 804 remainingBuckets--; 805 } 806 807 // Check and free if there are buckets that still need freeing of space 808 if (bucketSizesAboveThresholdCount(minFactor) > 0) { 809 bucketQueue.clear(); 810 remainingBuckets = 3; 811 812 bucketQueue.add(bucketSingle); 813 bucketQueue.add(bucketMulti); 814 bucketQueue.add(bucketMemory); 815 816 while ((bucketGroup = bucketQueue.poll()) != null) { 817 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; 818 bytesFreed += bucketGroup.free(bucketBytesToFree); 819 remainingBuckets--; 820 } 821 } 822 823 // Even after the above free we might still need freeing because of the 824 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e 825 // there might be some buckets where the occupancy is very sparse and thus are not 826 // yielding the free for the other bucket sizes, the fix for this to evict some 827 // of the buckets, we do this by evicting the buckets that are least fulled 828 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * 829 bucketSizesAboveThresholdCount(1.0f)); 830 831 if (LOG.isDebugEnabled()) { 832 long single = bucketSingle.totalSize(); 833 long multi = bucketMulti.totalSize(); 834 long memory = bucketMemory.totalSize(); 835 if (LOG.isDebugEnabled()) { 836 LOG.debug("Bucket cache free space completed; " + "freed=" 837 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" 838 + StringUtils.byteDesc(totalSize) + ", " + "single=" 839 + StringUtils.byteDesc(single) + ", " + "multi=" 840 + StringUtils.byteDesc(multi) + ", " + "memory=" 841 + StringUtils.byteDesc(memory)); 842 } 843 } 844 845 } catch (Throwable t) { 846 LOG.warn("Failed freeing space", t); 847 } finally { 848 cacheStats.evict(); 849 freeInProgress = false; 850 freeSpaceLock.unlock(); 851 } 852 } 853 854 // This handles flushing the RAM cache to IOEngine. 855 class WriterThread extends Thread { 856 private final BlockingQueue<RAMQueueEntry> inputQueue; 857 private volatile boolean writerEnabled = true; 858 859 WriterThread(BlockingQueue<RAMQueueEntry> queue) { 860 super("BucketCacheWriterThread"); 861 this.inputQueue = queue; 862 } 863 864 // Used for test 865 void disableWriter() { 866 this.writerEnabled = false; 867 } 868 869 @Override 870 public void run() { 871 List<RAMQueueEntry> entries = new ArrayList<>(); 872 try { 873 while (cacheEnabled && writerEnabled) { 874 try { 875 try { 876 // Blocks 877 entries = getRAMQueueEntries(inputQueue, entries); 878 } catch (InterruptedException ie) { 879 if (!cacheEnabled || !writerEnabled) { 880 break; 881 } 882 } 883 doDrain(entries); 884 } catch (Exception ioe) { 885 LOG.error("WriterThread encountered error", ioe); 886 } 887 } 888 } catch (Throwable t) { 889 LOG.warn("Failed doing drain", t); 890 } 891 LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); 892 } 893 894 /** 895 * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing 896 * cache with a new block for the same cache key. there's a corner case: one thread cache a 897 * block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another 898 * new block with the same cache key do the same thing for the same cache key, so if not evict 899 * the previous bucket entry, then memory leak happen because the previous bucketEntry is gone 900 * but the bucketAllocator do not free its memory. 901 * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey 902 * cacheKey, Cacheable newBlock) 903 * @param key Block cache key 904 * @param bucketEntry Bucket entry to put into backingMap. 905 */ 906 private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 907 BucketEntry previousEntry = backingMap.put(key, bucketEntry); 908 if (previousEntry != null && previousEntry != bucketEntry) { 909 previousEntry.withWriteLock(offsetLock, () -> { 910 blockEvicted(key, previousEntry, false); 911 return null; 912 }); 913 } 914 } 915 916 /** 917 * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. 918 * Process all that are passed in even if failure being sure to remove from ramCache else we'll 919 * never undo the references and we'll OOME. 920 * @param entries Presumes list passed in here will be processed by this invocation only. No 921 * interference expected. 922 * @throws InterruptedException 923 */ 924 void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException { 925 if (entries.isEmpty()) { 926 return; 927 } 928 // This method is a little hard to follow. We run through the passed in entries and for each 929 // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must 930 // do cleanup making sure we've cleared ramCache of all entries regardless of whether we 931 // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by 932 // filling ramCache. We do the clean up by again running through the passed in entries 933 // doing extra work when we find a non-null bucketEntries corresponding entry. 934 final int size = entries.size(); 935 BucketEntry[] bucketEntries = new BucketEntry[size]; 936 // Index updated inside loop if success or if we can't succeed. We retry if cache is full 937 // when we go to add an entry by going around the loop again without upping the index. 938 int index = 0; 939 while (cacheEnabled && index < size) { 940 RAMQueueEntry re = null; 941 try { 942 re = entries.get(index); 943 if (re == null) { 944 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); 945 index++; 946 continue; 947 } 948 BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize); 949 // Successfully added. Up index and add bucketEntry. Clear io exceptions. 950 bucketEntries[index] = bucketEntry; 951 if (ioErrorStartTime > 0) { 952 ioErrorStartTime = -1; 953 } 954 index++; 955 } catch (BucketAllocatorException fle) { 956 LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle); 957 // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. 958 bucketEntries[index] = null; 959 index++; 960 } catch (CacheFullException cfe) { 961 // Cache full when we tried to add. Try freeing space and then retrying (don't up index) 962 if (!freeInProgress) { 963 freeSpace("Full!"); 964 } else { 965 Thread.sleep(50); 966 } 967 } catch (IOException ioex) { 968 // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. 969 LOG.error("Failed writing to bucket cache", ioex); 970 checkIOErrorIsTolerated(); 971 } 972 } 973 974 // Make sure data pages are written on media before we update maps. 975 try { 976 ioEngine.sync(); 977 } catch (IOException ioex) { 978 LOG.error("Failed syncing IO engine", ioex); 979 checkIOErrorIsTolerated(); 980 // Since we failed sync, free the blocks in bucket allocator 981 for (int i = 0; i < entries.size(); ++i) { 982 if (bucketEntries[i] != null) { 983 bucketAllocator.freeBlock(bucketEntries[i].offset()); 984 bucketEntries[i] = null; 985 } 986 } 987 } 988 989 // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if 990 // success or error. 991 for (int i = 0; i < size; ++i) { 992 BlockCacheKey key = entries.get(i).getKey(); 993 // Only add if non-null entry. 994 if (bucketEntries[i] != null) { 995 putIntoBackingMap(key, bucketEntries[i]); 996 } 997 // Always remove from ramCache even if we failed adding it to the block cache above. 998 boolean existed = ramCache.remove(key, re -> { 999 if (re != null) { 1000 heapSize.add(-1 * re.getData().heapSize()); 1001 } 1002 }); 1003 if (!existed && bucketEntries[i] != null) { 1004 // Block should have already been evicted. Remove it and free space. 1005 final BucketEntry bucketEntry = bucketEntries[i]; 1006 bucketEntry.withWriteLock(offsetLock, () -> { 1007 if (backingMap.remove(key, bucketEntry)) { 1008 blockEvicted(key, bucketEntry, false); 1009 } 1010 return null; 1011 }); 1012 } 1013 } 1014 1015 long used = bucketAllocator.getUsedSize(); 1016 if (used > acceptableSize()) { 1017 freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); 1018 } 1019 return; 1020 } 1021 } 1022 1023 /** 1024 * Blocks until elements available in {@code q} then tries to grab as many as possible before 1025 * returning. 1026 * @param receptacle Where to stash the elements taken from queue. We clear before we use it just 1027 * in case. 1028 * @param q The queue to take from. 1029 * @return {@code receptacle} laden with elements taken from the queue or empty if none found. 1030 */ 1031 static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q, 1032 List<RAMQueueEntry> receptacle) throws InterruptedException { 1033 // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it 1034 // ok even if list grew to accommodate thousands. 1035 receptacle.clear(); 1036 receptacle.add(q.take()); 1037 q.drainTo(receptacle); 1038 return receptacle; 1039 } 1040 1041 /** 1042 * @see #retrieveFromFile(int[]) 1043 */ 1044 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="OBL_UNSATISFIED_OBLIGATION", 1045 justification = "false positive, try-with-resources ensures close is called.") 1046 private void persistToFile() throws IOException { 1047 assert !cacheEnabled; 1048 if (!ioEngine.isPersistent()) { 1049 throw new IOException("Attempt to persist non-persistent cache mappings!"); 1050 } 1051 try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) { 1052 fos.write(ProtobufMagic.PB_MAGIC); 1053 BucketProtoUtils.toPB(this).writeDelimitedTo(fos); 1054 } 1055 } 1056 1057 /** 1058 * @see #persistToFile() 1059 */ 1060 private void retrieveFromFile(int[] bucketSizes) throws IOException { 1061 File persistenceFile = new File(persistencePath); 1062 if (!persistenceFile.exists()) { 1063 return; 1064 } 1065 assert !cacheEnabled; 1066 1067 try (FileInputStream in = deleteFileOnClose(persistenceFile)) { 1068 int pblen = ProtobufMagic.lengthOfPBMagic(); 1069 byte[] pbuf = new byte[pblen]; 1070 int read = in.read(pbuf); 1071 if (read != pblen) { 1072 throw new IOException("Incorrect number of bytes read while checking for protobuf magic " 1073 + "number. Requested=" + pblen + ", Received= " + read + ", File=" + persistencePath); 1074 } 1075 if (! ProtobufMagic.isPBMagicPrefix(pbuf)) { 1076 // In 3.0 we have enough flexibility to dump the old cache data. 1077 // TODO: In 2.x line, this might need to be filled in to support reading the old format 1078 throw new IOException("Persistence file does not start with protobuf magic number. " + 1079 persistencePath); 1080 } 1081 parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); 1082 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1083 blockNumber.add(backingMap.size()); 1084 } 1085 } 1086 1087 /** 1088 * Create an input stream that deletes the file after reading it. Use in try-with-resources to 1089 * avoid this pattern where an exception thrown from a finally block may mask earlier exceptions: 1090 * <pre> 1091 * File f = ... 1092 * try (FileInputStream fis = new FileInputStream(f)) { 1093 * // use the input stream 1094 * } finally { 1095 * if (!f.delete()) throw new IOException("failed to delete"); 1096 * } 1097 * </pre> 1098 * @param file the file to read and delete 1099 * @return a FileInputStream for the given file 1100 * @throws IOException if there is a problem creating the stream 1101 */ 1102 private FileInputStream deleteFileOnClose(final File file) throws IOException { 1103 return new FileInputStream(file) { 1104 @Override 1105 public void close() throws IOException { 1106 super.close(); 1107 if (!file.delete()) { 1108 throw new IOException("Failed deleting persistence file " + file.getAbsolutePath()); 1109 } 1110 } 1111 }; 1112 } 1113 1114 private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) 1115 throws IOException { 1116 if (capacitySize != cacheCapacity) { 1117 throw new IOException("Mismatched cache capacity:" 1118 + StringUtils.byteDesc(capacitySize) + ", expected: " 1119 + StringUtils.byteDesc(cacheCapacity)); 1120 } 1121 if (!ioEngine.getClass().getName().equals(ioclass)) { 1122 throw new IOException("Class name for IO engine mismatch: " + ioclass 1123 + ", expected:" + ioEngine.getClass().getName()); 1124 } 1125 if (!backingMap.getClass().getName().equals(mapclass)) { 1126 throw new IOException("Class name for cache map mismatch: " + mapclass 1127 + ", expected:" + backingMap.getClass().getName()); 1128 } 1129 } 1130 1131 private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { 1132 if (proto.hasChecksum()) { 1133 ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), 1134 algorithm); 1135 } else { 1136 // if has not checksum, it means the persistence file is old format 1137 LOG.info("Persistent file is old format, it does not support verifying file integrity!"); 1138 } 1139 verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); 1140 backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap()); 1141 } 1142 1143 /** 1144 * Check whether we tolerate IO error this time. If the duration of IOEngine 1145 * throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the 1146 * cache 1147 */ 1148 private void checkIOErrorIsTolerated() { 1149 long now = EnvironmentEdgeManager.currentTime(); 1150 // Do a single read to a local variable to avoid timing issue - HBASE-24454 1151 long ioErrorStartTimeTmp = this.ioErrorStartTime; 1152 if (ioErrorStartTimeTmp > 0) { 1153 if (cacheEnabled && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { 1154 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration + 1155 "ms, disabling cache, please check your IOEngine"); 1156 disableCache(); 1157 } 1158 } else { 1159 this.ioErrorStartTime = now; 1160 } 1161 } 1162 1163 /** 1164 * Used to shut down the cache -or- turn it off in the case of something broken. 1165 */ 1166 private void disableCache() { 1167 if (!cacheEnabled) return; 1168 cacheEnabled = false; 1169 ioEngine.shutdown(); 1170 this.scheduleThreadPool.shutdown(); 1171 for (int i = 0; i < writerThreads.length; ++i) writerThreads[i].interrupt(); 1172 this.ramCache.clear(); 1173 if (!ioEngine.isPersistent() || persistencePath == null) { 1174 // If persistent ioengine and a path, we will serialize out the backingMap. 1175 this.backingMap.clear(); 1176 } 1177 } 1178 1179 private void join() throws InterruptedException { 1180 for (int i = 0; i < writerThreads.length; ++i) 1181 writerThreads[i].join(); 1182 } 1183 1184 @Override 1185 public void shutdown() { 1186 disableCache(); 1187 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() 1188 + "; path to write=" + persistencePath); 1189 if (ioEngine.isPersistent() && persistencePath != null) { 1190 try { 1191 join(); 1192 persistToFile(); 1193 } catch (IOException ex) { 1194 LOG.error("Unable to persist data on exit: " + ex.toString(), ex); 1195 } catch (InterruptedException e) { 1196 LOG.warn("Failed to persist data on exit", e); 1197 } 1198 } 1199 } 1200 1201 @Override 1202 public CacheStats getStats() { 1203 return cacheStats; 1204 } 1205 1206 public BucketAllocator getAllocator() { 1207 return this.bucketAllocator; 1208 } 1209 1210 @Override 1211 public long heapSize() { 1212 return this.heapSize.sum(); 1213 } 1214 1215 @Override 1216 public long size() { 1217 return this.realCacheSize.sum(); 1218 } 1219 1220 @Override 1221 public long getCurrentDataSize() { 1222 return size(); 1223 } 1224 1225 @Override 1226 public long getFreeSize() { 1227 return this.bucketAllocator.getFreeSize(); 1228 } 1229 1230 @Override 1231 public long getBlockCount() { 1232 return this.blockNumber.sum(); 1233 } 1234 1235 @Override 1236 public long getDataBlockCount() { 1237 return getBlockCount(); 1238 } 1239 1240 @Override 1241 public long getCurrentSize() { 1242 return this.bucketAllocator.getUsedSize(); 1243 } 1244 1245 protected String getAlgorithm() { 1246 return algorithm; 1247 } 1248 1249 /** 1250 * Evicts all blocks for a specific HFile. 1251 * <p> 1252 * This is used for evict-on-close to remove all blocks of a specific HFile. 1253 * 1254 * @return the number of blocks evicted 1255 */ 1256 @Override 1257 public int evictBlocksByHfileName(String hfileName) { 1258 Set<BlockCacheKey> keySet = blocksByHFile.subSet( 1259 new BlockCacheKey(hfileName, Long.MIN_VALUE), true, 1260 new BlockCacheKey(hfileName, Long.MAX_VALUE), true); 1261 1262 int numEvicted = 0; 1263 for (BlockCacheKey key : keySet) { 1264 if (evictBlock(key)) { 1265 ++numEvicted; 1266 } 1267 } 1268 1269 return numEvicted; 1270 } 1271 1272 /** 1273 * Used to group bucket entries into priority buckets. There will be a 1274 * BucketEntryGroup for each priority (single, multi, memory). Once bucketed, 1275 * the eviction algorithm takes the appropriate number of elements out of each 1276 * according to configuration parameters and their relative sizes. 1277 */ 1278 private class BucketEntryGroup { 1279 1280 private CachedEntryQueue queue; 1281 private long totalSize = 0; 1282 private long bucketSize; 1283 1284 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { 1285 this.bucketSize = bucketSize; 1286 queue = new CachedEntryQueue(bytesToFree, blockSize); 1287 totalSize = 0; 1288 } 1289 1290 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) { 1291 totalSize += block.getValue().getLength(); 1292 queue.add(block); 1293 } 1294 1295 public long free(long toFree) { 1296 Map.Entry<BlockCacheKey, BucketEntry> entry; 1297 long freedBytes = 0; 1298 // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free 1299 // What to do then? Caching attempt fail? Need some changes in cacheBlock API? 1300 while ((entry = queue.pollLast()) != null) { 1301 BucketEntry be = entry.getValue(); 1302 if (be.withWriteLock(offsetLock, be::markStaleAsEvicted)) { 1303 freedBytes += be.getLength(); 1304 } 1305 if (freedBytes >= toFree) { 1306 return freedBytes; 1307 } 1308 } 1309 return freedBytes; 1310 } 1311 1312 public long overflow() { 1313 return totalSize - bucketSize; 1314 } 1315 1316 public long totalSize() { 1317 return totalSize; 1318 } 1319 } 1320 1321 /** 1322 * Block Entry stored in the memory with key,data and so on 1323 */ 1324 static class RAMQueueEntry { 1325 private final BlockCacheKey key; 1326 private final Cacheable data; 1327 private long accessCounter; 1328 private boolean inMemory; 1329 private final Recycler recycler; 1330 1331 RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory, 1332 Recycler recycler) { 1333 this.key = bck; 1334 this.data = data; 1335 this.accessCounter = accessCounter; 1336 this.inMemory = inMemory; 1337 this.recycler = recycler; 1338 } 1339 1340 public Cacheable getData() { 1341 return data; 1342 } 1343 1344 public BlockCacheKey getKey() { 1345 return key; 1346 } 1347 1348 public void access(long accessCounter) { 1349 this.accessCounter = accessCounter; 1350 } 1351 1352 private ByteBuffAllocator getByteBuffAllocator() { 1353 if (data instanceof HFileBlock) { 1354 return ((HFileBlock) data).getByteBuffAllocator(); 1355 } 1356 return ByteBuffAllocator.HEAP; 1357 } 1358 1359 public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, 1360 final LongAdder realCacheSize) throws IOException { 1361 int len = data.getSerializedLength(); 1362 // This cacheable thing can't be serialized 1363 if (len == 0) { 1364 return null; 1365 } 1366 long offset = alloc.allocateBlock(len); 1367 boolean succ = false; 1368 BucketEntry bucketEntry = null; 1369 try { 1370 bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, RefCnt.create(recycler), 1371 getByteBuffAllocator()); 1372 bucketEntry.setDeserializerReference(data.getDeserializer()); 1373 if (data instanceof HFileBlock) { 1374 // If an instance of HFileBlock, save on some allocations. 1375 HFileBlock block = (HFileBlock) data; 1376 ByteBuff sliceBuf = block.getBufferReadOnly(); 1377 ByteBuffer metadata = block.getMetaData(); 1378 ioEngine.write(sliceBuf, offset); 1379 ioEngine.write(metadata, offset + len - metadata.limit()); 1380 } else { 1381 // Only used for testing. 1382 ByteBuffer bb = ByteBuffer.allocate(len); 1383 data.serialize(bb, true); 1384 ioEngine.write(bb, offset); 1385 } 1386 succ = true; 1387 } finally { 1388 if (!succ) { 1389 alloc.freeBlock(offset); 1390 } 1391 } 1392 realCacheSize.add(len); 1393 return bucketEntry; 1394 } 1395 } 1396 1397 /** 1398 * Only used in test 1399 * @throws InterruptedException 1400 */ 1401 void stopWriterThreads() throws InterruptedException { 1402 for (WriterThread writerThread : writerThreads) { 1403 writerThread.disableWriter(); 1404 writerThread.interrupt(); 1405 writerThread.join(); 1406 } 1407 } 1408 1409 @Override 1410 public Iterator<CachedBlock> iterator() { 1411 // Don't bother with ramcache since stuff is in here only a little while. 1412 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = 1413 this.backingMap.entrySet().iterator(); 1414 return new Iterator<CachedBlock>() { 1415 private final long now = System.nanoTime(); 1416 1417 @Override 1418 public boolean hasNext() { 1419 return i.hasNext(); 1420 } 1421 1422 @Override 1423 public CachedBlock next() { 1424 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next(); 1425 return new CachedBlock() { 1426 @Override 1427 public String toString() { 1428 return BlockCacheUtil.toString(this, now); 1429 } 1430 1431 @Override 1432 public BlockPriority getBlockPriority() { 1433 return e.getValue().getPriority(); 1434 } 1435 1436 @Override 1437 public BlockType getBlockType() { 1438 // Not held by BucketEntry. Could add it if wanted on BucketEntry creation. 1439 return null; 1440 } 1441 1442 @Override 1443 public long getOffset() { 1444 return e.getKey().getOffset(); 1445 } 1446 1447 @Override 1448 public long getSize() { 1449 return e.getValue().getLength(); 1450 } 1451 1452 @Override 1453 public long getCachedTime() { 1454 return e.getValue().getCachedTime(); 1455 } 1456 1457 @Override 1458 public String getFilename() { 1459 return e.getKey().getHfileName(); 1460 } 1461 1462 @Override 1463 public int compareTo(CachedBlock other) { 1464 int diff = this.getFilename().compareTo(other.getFilename()); 1465 if (diff != 0) return diff; 1466 1467 diff = Long.compare(this.getOffset(), other.getOffset()); 1468 if (diff != 0) return diff; 1469 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1470 throw new IllegalStateException("" + this.getCachedTime() + ", " + 1471 other.getCachedTime()); 1472 } 1473 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1474 } 1475 1476 @Override 1477 public int hashCode() { 1478 return e.getKey().hashCode(); 1479 } 1480 1481 @Override 1482 public boolean equals(Object obj) { 1483 if (obj instanceof CachedBlock) { 1484 CachedBlock cb = (CachedBlock)obj; 1485 return compareTo(cb) == 0; 1486 } else { 1487 return false; 1488 } 1489 } 1490 }; 1491 } 1492 1493 @Override 1494 public void remove() { 1495 throw new UnsupportedOperationException(); 1496 } 1497 }; 1498 } 1499 1500 @Override 1501 public BlockCache[] getBlockCaches() { 1502 return null; 1503 } 1504 1505 public int getRpcRefCount(BlockCacheKey cacheKey) { 1506 BucketEntry bucketEntry = backingMap.get(cacheKey); 1507 if (bucketEntry != null) { 1508 return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1); 1509 } 1510 return 0; 1511 } 1512 1513 float getAcceptableFactor() { 1514 return acceptableFactor; 1515 } 1516 1517 float getMinFactor() { 1518 return minFactor; 1519 } 1520 1521 float getExtraFreeFactor() { 1522 return extraFreeFactor; 1523 } 1524 1525 float getSingleFactor() { 1526 return singleFactor; 1527 } 1528 1529 float getMultiFactor() { 1530 return multiFactor; 1531 } 1532 1533 float getMemoryFactor() { 1534 return memoryFactor; 1535 } 1536 1537 /** 1538 * Wrapped the delegate ConcurrentMap with maintaining its block's reference count. 1539 */ 1540 static class RAMCache { 1541 /** 1542 * Defined the map as {@link ConcurrentHashMap} explicitly here, because in 1543 * {@link RAMCache#get(BlockCacheKey)} and 1544 * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to 1545 * guarantee the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). 1546 * Besides, the func method can execute exactly once only when the key is present(or absent) 1547 * and under the lock context. Otherwise, the reference count of block will be messed up. 1548 * Notice that the {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 1549 */ 1550 final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>(); 1551 1552 public boolean containsKey(BlockCacheKey key) { 1553 return delegate.containsKey(key); 1554 } 1555 1556 public RAMQueueEntry get(BlockCacheKey key) { 1557 return delegate.computeIfPresent(key, (k, re) -> { 1558 // It'll be referenced by RPC, so retain atomically here. if the get and retain is not 1559 // atomic, another thread may remove and release the block, when retaining in this thread we 1560 // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422) 1561 re.getData().retain(); 1562 return re; 1563 }); 1564 } 1565 1566 /** 1567 * Return the previous associated value, or null if absent. It has the same meaning as 1568 * {@link ConcurrentMap#putIfAbsent(Object, Object)} 1569 */ 1570 public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) { 1571 AtomicBoolean absent = new AtomicBoolean(false); 1572 RAMQueueEntry re = delegate.computeIfAbsent(key, k -> { 1573 // The RAMCache reference to this entry, so reference count should be increment. 1574 entry.getData().retain(); 1575 absent.set(true); 1576 return entry; 1577 }); 1578 return absent.get() ? null : re; 1579 } 1580 1581 public boolean remove(BlockCacheKey key) { 1582 return remove(key, re->{}); 1583 } 1584 1585 /** 1586 * Defined an {@link Consumer} here, because once the removed entry release its reference count, 1587 * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an 1588 * exception. the consumer will access entry to remove before release its reference count. 1589 * Notice, don't change its reference count in the {@link Consumer} 1590 */ 1591 public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) { 1592 RAMQueueEntry previous = delegate.remove(key); 1593 action.accept(previous); 1594 if (previous != null) { 1595 previous.getData().release(); 1596 } 1597 return previous != null; 1598 } 1599 1600 public boolean isEmpty() { 1601 return delegate.isEmpty(); 1602 } 1603 1604 public void clear() { 1605 Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator(); 1606 while (it.hasNext()) { 1607 RAMQueueEntry re = it.next().getValue(); 1608 it.remove(); 1609 re.getData().release(); 1610 } 1611 } 1612 } 1613}