@InterfaceAudience.Private public class BucketCache extends Object implements BlockCache, HeapSize
BucketAllocator
to allocate/free blocks, and uses
BucketCache#ramCache and BucketCache#backingMap in order to
determine if a given element is in the cache. The bucket cache can use on-heap or
off-heap memory ByteBufferIOEngine
or in a file FileIOEngine
to
store/read the block data.
Eviction is via a similar algorithm as used in
LruBlockCache
BucketCache can be used as mainly a block cache (see
CombinedBlockCache
), combined with
a BlockCache to decrease CMS GC and heap fragmentation.
It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to enlarge cache space via a victim cache.
Modifier and Type | Class and Description |
---|---|
private class |
BucketCache.BucketEntryGroup
Used to group bucket entries into priority buckets.
|
(package private) static class |
BucketCache.RAMCache
Wrapped the delegate ConcurrentMap with maintaining its block's reference count.
|
(package private) static class |
BucketCache.RAMQueueEntry
Block Entry stored in the memory with key,data and so on
|
private static class |
BucketCache.StatisticsThread |
(package private) class |
BucketCache.WriterThread |
Modifier and Type | Field and Description |
---|---|
(package private) static String |
ACCEPT_FACTOR_CONFIG_NAME |
private float |
acceptableFactor
Acceptable size of cache (no evictions if size < acceptable)
|
private AtomicLong |
accessCount
Cache access count (sequential ID)
|
private String |
algorithm
Use
MessageDigest class's encryption algorithms to check
persistent file integrity, default algorithm is MD5 |
(package private) ConcurrentHashMap<BlockCacheKey,BucketEntry> |
backingMap |
private LongAdder |
blockNumber
Current number of cached elements
|
private NavigableSet<BlockCacheKey> |
blocksByHFile |
private long |
blockSize
Approximate block size
|
private BucketAllocator |
bucketAllocator |
private long |
cacheCapacity |
private boolean |
cacheEnabled
Flag if the cache is enabled or not...
|
private BucketCacheStats |
cacheStats |
private static float |
DEFAULT_ACCEPT_FACTOR |
private static int |
DEFAULT_CACHE_WAIT_TIME |
static int |
DEFAULT_ERROR_TOLERATION_DURATION |
private static float |
DEFAULT_EXTRA_FREE_FACTOR |
private static String |
DEFAULT_FILE_VERIFY_ALGORITHM |
private static int |
DEFAULT_FREE_ENTIRE_BLOCK_FACTOR |
(package private) static float |
DEFAULT_MEMORY_FACTOR |
(package private) static float |
DEFAULT_MIN_FACTOR |
(package private) static float |
DEFAULT_MULTI_FACTOR |
(package private) static float |
DEFAULT_SINGLE_FACTOR
Priority buckets
|
(package private) static int |
DEFAULT_WRITER_QUEUE_ITEMS |
(package private) static int |
DEFAULT_WRITER_THREADS |
(package private) static String |
EXTRA_FREE_FACTOR_CONFIG_NAME |
private float |
extraFreeFactor
Free this floating point factor of extra blocks when evicting.
|
private static String |
FILE_VERIFY_ALGORITHM |
private boolean |
freeInProgress
Volatile boolean to track if free space is in process or not
|
private Lock |
freeSpaceLock |
private LongAdder |
heapSize |
(package private) IOEngine |
ioEngine |
private long |
ioErrorStartTime |
private int |
ioErrorsTolerationDuration
Duration of IO errors tolerated before we disable cache, 1 min as default
|
private static org.slf4j.Logger |
LOG |
(package private) static String |
MEMORY_FACTOR_CONFIG_NAME |
private float |
memoryFactor
In-memory bucket size
|
(package private) static String |
MIN_FACTOR_CONFIG_NAME |
private float |
minFactor
Minimum threshold of cache (when evicting, evict until size < min)
|
(package private) static String |
MULTI_FACTOR_CONFIG_NAME |
private float |
multiFactor
Multiple access bucket size
|
(package private) IdReadWriteLock<Long> |
offsetLock
A ReentrantReadWriteLock to lock on a particular block identified by offset.
|
private String |
persistencePath |
(package private) BucketCache.RAMCache |
ramCache |
private LongAdder |
realCacheSize |
private ScheduledExecutorService |
scheduleThreadPool
Statistics thread schedule pool (for heavy debugging, could remove)
|
(package private) static String |
SINGLE_FACTOR_CONFIG_NAME
Priority buckets config
|
private float |
singleFactor
Single access bucket size
|
private static int |
statThreadPeriod
Statistics thread
|
(package private) boolean |
wait_when_cache
Used in tests.
|
(package private) ArrayList<BlockingQueue<BucketCache.RAMQueueEntry>> |
writerQueues
A list of writer queues.
|
(package private) BucketCache.WriterThread[] |
writerThreads |
Constructor and Description |
---|
BucketCache(String ioEngineName,
long capacity,
int blockSize,
int[] bucketSizes,
int writerThreadNum,
int writerQLen,
String persistencePath) |
BucketCache(String ioEngineName,
long capacity,
int blockSize,
int[] bucketSizes,
int writerThreadNum,
int writerQLen,
String persistencePath,
int ioErrorsTolerationDuration,
org.apache.hadoop.conf.Configuration conf) |
Modifier and Type | Method and Description |
---|---|
long |
acceptableSize() |
(package private) void |
blockEvicted(BlockCacheKey cacheKey,
BucketEntry bucketEntry,
boolean decrementBlockNumber) |
private int |
bucketSizesAboveThresholdCount(float minFactor)
Return the count of bucketSizeinfos still need free space
|
void |
cacheBlock(BlockCacheKey cacheKey,
Cacheable buf)
Cache the block with the specified name and buffer.
|
void |
cacheBlock(BlockCacheKey cacheKey,
Cacheable cachedItem,
boolean inMemory)
Cache the block with the specified name and buffer.
|
void |
cacheBlockWithWait(BlockCacheKey cacheKey,
Cacheable cachedItem,
boolean inMemory,
boolean wait)
Cache the block to ramCache
|
private void |
cacheBlockWithWaitInternal(BlockCacheKey cacheKey,
Cacheable cachedItem,
boolean inMemory,
boolean wait) |
private void |
checkIOErrorIsTolerated()
Check whether we tolerate IO error this time.
|
private ByteBuffAllocator.Recycler |
createRecycler(BlockCacheKey cacheKey) |
private FileInputStream |
deleteFileOnClose(File file)
Create an input stream that deletes the file after reading it.
|
private void |
disableCache()
Used to shut down the cache -or- turn it off in the case of something broken.
|
boolean |
evictBlock(BlockCacheKey cacheKey)
Try to evict the block from
BlockCache by force. |
int |
evictBlocksByHfileName(String hfileName)
Evicts all blocks for a specific HFile.
|
private void |
freeEntireBuckets(int completelyFreeBucketsNeeded)
This method will find the buckets that are minimally occupied
and are not reference counted and will free them completely
without any constraint on the access times of the elements,
and as a process will completely free at most the number of buckets
passed, sometimes it might not due to changing refCounts
|
private void |
freeSpace(String why)
Free the space if the used size reaches acceptableSize() or one size block
couldn't be allocated.
|
(package private) float |
getAcceptableFactor() |
protected String |
getAlgorithm() |
BucketAllocator |
getAllocator() |
Cacheable |
getBlock(BlockCacheKey key,
boolean caching,
boolean repeat,
boolean updateCacheMetrics)
Get the buffer of the block with the specified key.
|
BlockCache[] |
getBlockCaches() |
long |
getBlockCount()
Returns the number of blocks currently cached in the block cache.
|
long |
getCurrentDataSize()
Returns the occupied size of data blocks, in bytes.
|
long |
getCurrentSize()
Returns the occupied size of the block cache, in bytes.
|
long |
getDataBlockCount()
Returns the number of data blocks currently cached in the block cache.
|
(package private) float |
getExtraFreeFactor() |
long |
getFreeSize()
Returns the free size of the block cache, in bytes.
|
String |
getIoEngine() |
private IOEngine |
getIOEngineFromName(String ioEngineName,
long capacity,
String persistencePath)
Get the IOEngine from the IO engine name
|
long |
getMaxSize()
Returns the Max size of the block cache, in bytes.
|
(package private) float |
getMemoryFactor() |
(package private) float |
getMinFactor() |
(package private) float |
getMultiFactor() |
(package private) long |
getPartitionSize(float partitionFactor) |
(package private) static List<BucketCache.RAMQueueEntry> |
getRAMQueueEntries(BlockingQueue<BucketCache.RAMQueueEntry> q,
List<BucketCache.RAMQueueEntry> receptacle)
Blocks until elements available in
q then tries to grab as many as possible before
returning. |
long |
getRealCacheSize() |
int |
getRpcRefCount(BlockCacheKey cacheKey) |
(package private) float |
getSingleFactor() |
CacheStats |
getStats()
Get the statistics for this block cache.
|
long |
heapSize() |
(package private) boolean |
isCacheEnabled() |
Iterator<CachedBlock> |
iterator() |
private void |
join() |
void |
logStats() |
private void |
parsePB(org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos.BucketCacheEntry proto) |
private void |
persistToFile() |
private boolean |
removeFromRamCache(BlockCacheKey cacheKey) |
private void |
retrieveFromFile(int[] bucketSizes) |
private void |
sanityCheckConfigs() |
void |
shutdown()
Shutdown the cache.
|
long |
size()
Returns the total size of the block cache, in bytes.
|
protected void |
startWriterThreads()
Called by the constructor to start the writer threads.
|
(package private) void |
stopWriterThreads()
Only used in test
|
private void |
verifyCapacityAndClasses(long capacitySize,
String ioclass,
String mapclass) |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
forEach, spliterator
private static final org.slf4j.Logger LOG
static final String SINGLE_FACTOR_CONFIG_NAME
static final String MULTI_FACTOR_CONFIG_NAME
static final String MEMORY_FACTOR_CONFIG_NAME
static final String EXTRA_FREE_FACTOR_CONFIG_NAME
static final String ACCEPT_FACTOR_CONFIG_NAME
static final String MIN_FACTOR_CONFIG_NAME
static final float DEFAULT_SINGLE_FACTOR
static final float DEFAULT_MULTI_FACTOR
static final float DEFAULT_MEMORY_FACTOR
static final float DEFAULT_MIN_FACTOR
private static final float DEFAULT_EXTRA_FREE_FACTOR
private static final float DEFAULT_ACCEPT_FACTOR
private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR
private static final int statThreadPeriod
static final int DEFAULT_WRITER_THREADS
static final int DEFAULT_WRITER_QUEUE_ITEMS
final transient BucketCache.RAMCache ramCache
transient ConcurrentHashMap<BlockCacheKey,BucketEntry> backingMap
private volatile boolean cacheEnabled
final transient ArrayList<BlockingQueue<BucketCache.RAMQueueEntry>> writerQueues
BucketCache.WriterThread
we have running.
In other words, the work adding blocks to the BucketCache is divided up amongst the
running WriterThreads. Its done by taking hash of the cache key modulo queue count.
WriterThread when it runs takes whatever has been recently added and 'drains' the entries
to the BucketCache. It then updates the ramCache and backingMap accordingly.final transient BucketCache.WriterThread[] writerThreads
private volatile boolean freeInProgress
private final transient Lock freeSpaceLock
private final LongAdder realCacheSize
private final LongAdder blockNumber
private final AtomicLong accessCount
private static final int DEFAULT_CACHE_WAIT_TIME
boolean wait_when_cache
private final BucketCacheStats cacheStats
private final String persistencePath
private final long cacheCapacity
private final long blockSize
private final int ioErrorsTolerationDuration
public static final int DEFAULT_ERROR_TOLERATION_DURATION
private volatile long ioErrorStartTime
final transient IdReadWriteLock<Long> offsetLock
Key set of offsets in BucketCache is limited so soft reference is the best choice here.
private final NavigableSet<BlockCacheKey> blocksByHFile
private final transient ScheduledExecutorService scheduleThreadPool
private transient BucketAllocator bucketAllocator
private float acceptableFactor
private float minFactor
private float extraFreeFactor
private float singleFactor
private float multiFactor
private float memoryFactor
private static final String FILE_VERIFY_ALGORITHM
private static final String DEFAULT_FILE_VERIFY_ALGORITHM
private String algorithm
MessageDigest
class's encryption algorithms to check
persistent file integrity, default algorithm is MD5public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath) throws IOException
IOException
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, org.apache.hadoop.conf.Configuration conf) throws IOException
IOException
private void sanityCheckConfigs()
protected void startWriterThreads()
boolean isCacheEnabled()
public long getMaxSize()
BlockCache
getMaxSize
in interface BlockCache
public String getIoEngine()
private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) throws IOException
ioEngineName
- capacity
- persistencePath
- IOException
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf)
cacheBlock
in interface BlockCache
cacheKey
- block's cache keybuf
- block bufferpublic void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory)
cacheBlock
in interface BlockCache
cacheKey
- block's cache keycachedItem
- block bufferinMemory
- if block is in-memorypublic void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait)
cacheKey
- block's cache keycachedItem
- block bufferinMemory
- if block is in-memorywait
- if true, blocking wait when queue is fullprivate void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait)
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, boolean updateCacheMetrics)
getBlock
in interface BlockCache
key
- block's cache keycaching
- true if the caller caches blocks on cache missesrepeat
- Whether this is a repeat lookup for the same blockupdateCacheMetrics
- Whether we should update cache metrics or notvoid blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber)
public boolean evictBlock(BlockCacheKey cacheKey)
BlockCache
by force. We'll call this in few cases:Admin.clearBlockCache(TableName)
to clear all blocks for a given table.
Firstly, we'll try to remove the block from RAMCache. If it doesn't exist in RAMCache, then try
to evict from backingMap. Here we only need to free the reference from bucket cache by calling
BucketEntry.markedAsEvicted
. If there're still some RPC referring this block, block can
only be de-allocated when all of them release the block.
NOTICE: we need to grab the write offset lock firstly before releasing the reference from
bucket cache. if we don't, we may read an BucketEntry
with refCnt = 0 when
getBlock(BlockCacheKey, boolean, boolean, boolean)
, it's a memory leak.
evictBlock
in interface BlockCache
cacheKey
- Block to evictprivate ByteBuffAllocator.Recycler createRecycler(BlockCacheKey cacheKey)
private boolean removeFromRamCache(BlockCacheKey cacheKey)
public void logStats()
public long getRealCacheSize()
public long acceptableSize()
long getPartitionSize(float partitionFactor)
private int bucketSizesAboveThresholdCount(float minFactor)
private void freeEntireBuckets(int completelyFreeBucketsNeeded)
completelyFreeBucketsNeeded
- number of buckets to freeprivate void freeSpace(String why)
why
- Why we are being calledstatic List<BucketCache.RAMQueueEntry> getRAMQueueEntries(BlockingQueue<BucketCache.RAMQueueEntry> q, List<BucketCache.RAMQueueEntry> receptacle) throws InterruptedException
q
then tries to grab as many as possible before
returning.receptacle
- Where to stash the elements taken from queue. We clear before we use it just
in case.q
- The queue to take from.receptacle
laden with elements taken from the queue or empty if none found.InterruptedException
private void persistToFile() throws IOException
IOException
retrieveFromFile(int[])
private void retrieveFromFile(int[] bucketSizes) throws IOException
IOException
persistToFile()
private FileInputStream deleteFileOnClose(File file) throws IOException
File f = ... try (FileInputStream fis = new FileInputStream(f)) { // use the input stream } finally { if (!f.delete()) throw new IOException("failed to delete"); }
file
- the file to read and deleteIOException
- if there is a problem creating the streamprivate void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) throws IOException
IOException
private void parsePB(org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos.BucketCacheEntry proto) throws IOException
IOException
private void checkIOErrorIsTolerated()
private void disableCache()
private void join() throws InterruptedException
InterruptedException
public void shutdown()
BlockCache
shutdown
in interface BlockCache
public CacheStats getStats()
BlockCache
getStats
in interface BlockCache
public BucketAllocator getAllocator()
public long heapSize()
public long size()
BlockCache
size
in interface BlockCache
public long getCurrentDataSize()
BlockCache
getCurrentDataSize
in interface BlockCache
public long getFreeSize()
BlockCache
getFreeSize
in interface BlockCache
public long getBlockCount()
BlockCache
getBlockCount
in interface BlockCache
public long getDataBlockCount()
BlockCache
getDataBlockCount
in interface BlockCache
public long getCurrentSize()
BlockCache
getCurrentSize
in interface BlockCache
protected String getAlgorithm()
public int evictBlocksByHfileName(String hfileName)
This is used for evict-on-close to remove all blocks of a specific HFile.
evictBlocksByHfileName
in interface BlockCache
void stopWriterThreads() throws InterruptedException
InterruptedException
public Iterator<CachedBlock> iterator()
iterator
in interface Iterable<CachedBlock>
iterator
in interface BlockCache
public BlockCache[] getBlockCaches()
getBlockCaches
in interface BlockCache
public int getRpcRefCount(BlockCacheKey cacheKey)
float getAcceptableFactor()
float getMinFactor()
float getExtraFreeFactor()
float getSingleFactor()
float getMultiFactor()
float getMemoryFactor()
Copyright © 2007–2020 The Apache Software Foundation. All rights reserved.