@InterfaceAudience.Private public class BucketCache extends Object implements BlockCache, HeapSize
BucketAllocator
to allocate/free blocks, and uses
ramCache
and backingMap
in order to
determine if a given element is in the cache. The bucket cache can use on-heap or
off-heap memory ByteBufferIOEngine
or in a file FileIOEngine
to
store/read the block data.
Eviction is via a similar algorithm as used in
LruBlockCache
BucketCache can be used as mainly a block cache (see
CombinedBlockCache
),
combined with LruBlockCache to decrease CMS GC and heap fragmentation.
It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store
blocks) to enlarge cache space via
LruBlockCache.setVictimCache(org.apache.hadoop.hbase.io.hfile.BlockCache)
Modifier and Type | Class and Description |
---|---|
(package private) static class |
BucketCache.BucketEntry
Item in cache.
|
private class |
BucketCache.BucketEntryGroup
Used to group bucket entries into priority buckets.
|
(package private) static class |
BucketCache.RAMQueueEntry
Block Entry stored in the memory with key,data and so on
|
private static class |
BucketCache.StatisticsThread |
(package private) class |
BucketCache.WriterThread |
Modifier and Type | Field and Description |
---|---|
private AtomicLong |
accessCount
Cache access count (sequential ID)
|
(package private) ConcurrentMap<BlockCacheKey,BucketCache.BucketEntry> |
backingMap |
private AtomicLong |
blockNumber
Current number of cached elements
|
private ConcurrentIndex<String,BlockCacheKey> |
blocksByHFile |
private long |
blockSize
Approximate block size
|
private BucketAllocator |
bucketAllocator |
private long |
cacheCapacity |
private boolean |
cacheEnabled
Flag if the cache is enabled or not...
|
private BucketCacheStats |
cacheStats |
private static float |
DEFAULT_ACCEPT_FACTOR |
private static int |
DEFAULT_CACHE_WAIT_TIME |
static int |
DEFAULT_ERROR_TOLERATION_DURATION |
private static float |
DEFAULT_EXTRA_FREE_FACTOR |
private static int |
DEFAULT_FREE_ENTIRE_BLOCK_FACTOR |
private static float |
DEFAULT_MEMORY_FACTOR |
private static float |
DEFAULT_MIN_FACTOR |
private static float |
DEFAULT_MULTI_FACTOR |
private static float |
DEFAULT_SINGLE_FACTOR
Priority buckets
|
(package private) static int |
DEFAULT_WRITER_QUEUE_ITEMS |
(package private) static int |
DEFAULT_WRITER_THREADS |
private UniqueIndexMap<Integer> |
deserialiserMap |
private boolean |
freeInProgress
Volatile boolean to track if free space is in process or not
|
private Lock |
freeSpaceLock |
private AtomicLong |
heapSize |
(package private) IOEngine |
ioEngine |
private long |
ioErrorStartTime |
private int |
ioErrorsTolerationDuration
Duration of IO errors tolerated before we disable cache, 1 min as default
|
private static org.apache.commons.logging.Log |
LOG |
(package private) IdReadWriteLock |
offsetLock
A ReentrantReadWriteLock to lock on a particular block identified by offset.
|
private String |
persistencePath |
(package private) ConcurrentMap<BlockCacheKey,BucketCache.RAMQueueEntry> |
ramCache |
private AtomicLong |
realCacheSize |
private ScheduledExecutorService |
scheduleThreadPool
Statistics thread schedule pool (for heavy debugging, could remove)
|
private static int |
statThreadPeriod
Statistics thread
|
(package private) boolean |
wait_when_cache |
(package private) ArrayList<BlockingQueue<BucketCache.RAMQueueEntry>> |
writerQueues
A list of writer queues.
|
(package private) BucketCache.WriterThread[] |
writerThreads |
Constructor and Description |
---|
BucketCache(String ioEngineName,
long capacity,
int blockSize,
int[] bucketSizes,
int writerThreadNum,
int writerQLen,
String persistencePath) |
BucketCache(String ioEngineName,
long capacity,
int blockSize,
int[] bucketSizes,
int writerThreadNum,
int writerQLen,
String persistencePath,
int ioErrorsTolerationDuration) |
Modifier and Type | Method and Description |
---|---|
private long |
acceptableSize() |
(package private) void |
blockEvicted(BlockCacheKey cacheKey,
BucketCache.BucketEntry bucketEntry,
boolean decrementBlockNumber) |
private int |
bucketSizesAboveThresholdCount(float minFactor)
Return the count of bucketSizeinfos still needf ree space
|
void |
cacheBlock(BlockCacheKey cacheKey,
Cacheable buf)
Cache the block with the specified name and buffer.
|
void |
cacheBlock(BlockCacheKey cacheKey,
Cacheable cachedItem,
boolean inMemory,
boolean cacheDataInL1)
Cache the block with the specified name and buffer.
|
void |
cacheBlockWithWait(BlockCacheKey cacheKey,
Cacheable cachedItem,
boolean inMemory,
boolean wait)
Cache the block to ramCache
|
private void |
checkIOErrorIsTolerated()
Check whether we tolerate IO error this time.
|
private void |
disableCache()
Used to shut down the cache -or- turn it off in the case of something
broken.
|
boolean |
evictBlock(BlockCacheKey cacheKey)
Evict block from cache.
|
int |
evictBlocksByHfileName(String hfileName)
Evicts all blocks for a specific HFile.
|
private void |
freeEntireBuckets(int completelyFreeBucketsNeeded)
This method will find the buckets that are minimally occupied
and are not reference counted and will free them completely
without any constraint on the access times of the elements,
and as a process will completely free at most the number of buckets
passed, sometimes it might not due to changing refCounts
|
private void |
freeSpace(String why)
Free the space if the used size reaches acceptableSize() or one size block
couldn't be allocated.
|
BucketAllocator |
getAllocator() |
Cacheable |
getBlock(BlockCacheKey key,
boolean caching,
boolean repeat,
boolean updateCacheMetrics)
Get the buffer of the block with the specified key.
|
BlockCache[] |
getBlockCaches() |
long |
getBlockCount()
Returns the number of blocks currently cached in the block cache.
|
long |
getCurrentSize()
Returns the occupied size of the block cache, in bytes.
|
long |
getFreeSize()
Returns the free size of the block cache, in bytes.
|
String |
getIoEngine() |
private IOEngine |
getIOEngineFromName(String ioEngineName,
long capacity)
Get the IOEngine from the IO engine name
|
long |
getMaxSize() |
(package private) static List<BucketCache.RAMQueueEntry> |
getRAMQueueEntries(BlockingQueue<BucketCache.RAMQueueEntry> q,
List<BucketCache.RAMQueueEntry> receptical)
Blocks until elements available in
q then tries to grab as many as possible
before returning. |
long |
getRealCacheSize() |
CacheStats |
getStats()
Get the statistics for this block cache.
|
long |
heapSize() |
(package private) boolean |
isCacheEnabled() |
Iterator<CachedBlock> |
iterator() |
private void |
join() |
void |
logStats() |
private long |
memorySize() |
private long |
multiSize() |
private void |
persistToFile() |
private void |
retrieveFromFile(int[] bucketSizes) |
void |
shutdown()
Shutdown the cache.
|
private long |
singleSize() |
long |
size()
Returns the total size of the block cache, in bytes.
|
protected void |
startWriterThreads()
Called by the constructor to start the writer threads.
|
(package private) void |
stopWriterThreads()
Only used in test
|
private static final org.apache.commons.logging.Log LOG
private static final float DEFAULT_SINGLE_FACTOR
private static final float DEFAULT_MULTI_FACTOR
private static final float DEFAULT_MEMORY_FACTOR
private static final float DEFAULT_EXTRA_FREE_FACTOR
private static final float DEFAULT_ACCEPT_FACTOR
private static final float DEFAULT_MIN_FACTOR
private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR
private static final int statThreadPeriod
static final int DEFAULT_WRITER_THREADS
static final int DEFAULT_WRITER_QUEUE_ITEMS
final IOEngine ioEngine
final ConcurrentMap<BlockCacheKey,BucketCache.RAMQueueEntry> ramCache
ConcurrentMap<BlockCacheKey,BucketCache.BucketEntry> backingMap
private volatile boolean cacheEnabled
final ArrayList<BlockingQueue<BucketCache.RAMQueueEntry>> writerQueues
BucketCache.WriterThread
we have running.
In other words, the work adding blocks to the BucketCache is divided up amongst the
running WriterThreads. Its done by taking hash of the cache key modulo queue count.
WriterThread when it runs takes whatever has been recently added and 'drains' the entries
to the BucketCache. It then updates the ramCache and backingMap accordingly.final BucketCache.WriterThread[] writerThreads
private volatile boolean freeInProgress
private final Lock freeSpaceLock
private UniqueIndexMap<Integer> deserialiserMap
private final AtomicLong realCacheSize
private final AtomicLong heapSize
private final AtomicLong blockNumber
private final AtomicLong accessCount
private static final int DEFAULT_CACHE_WAIT_TIME
boolean wait_when_cache
private final BucketCacheStats cacheStats
private final String persistencePath
private final long cacheCapacity
private final long blockSize
private final int ioErrorsTolerationDuration
public static final int DEFAULT_ERROR_TOLERATION_DURATION
private volatile long ioErrorStartTime
final IdReadWriteLock offsetLock
private final ConcurrentIndex<String,BlockCacheKey> blocksByHFile
private final ScheduledExecutorService scheduleThreadPool
private BucketAllocator bucketAllocator
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException, IOException
FileNotFoundException
IOException
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration) throws FileNotFoundException, IOException
FileNotFoundException
IOException
protected void startWriterThreads()
boolean isCacheEnabled()
public long getMaxSize()
public String getIoEngine()
private IOEngine getIOEngineFromName(String ioEngineName, long capacity) throws IOException
ioEngineName
- capacity
- IOException
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf)
cacheBlock
in interface BlockCache
cacheKey
- block's cache keybuf
- block bufferpublic void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean cacheDataInL1)
cacheBlock
in interface BlockCache
cacheKey
- block's cache keycachedItem
- block bufferinMemory
- if block is in-memorycacheDataInL1
- public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait)
cacheKey
- block's cache keycachedItem
- block bufferinMemory
- if block is in-memorywait
- if true, blocking wait when queue is fullpublic Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, boolean updateCacheMetrics)
getBlock
in interface BlockCache
key
- block's cache keycaching
- true if the caller caches blocks on cache missesrepeat
- Whether this is a repeat lookup for the same blockupdateCacheMetrics
- Whether we should update cache metrics or notvoid blockEvicted(BlockCacheKey cacheKey, BucketCache.BucketEntry bucketEntry, boolean decrementBlockNumber)
public boolean evictBlock(BlockCacheKey cacheKey)
BlockCache
evictBlock
in interface BlockCache
cacheKey
- Block to evictpublic void logStats()
public long getRealCacheSize()
private long acceptableSize()
private long singleSize()
private long multiSize()
private long memorySize()
private int bucketSizesAboveThresholdCount(float minFactor)
private void freeEntireBuckets(int completelyFreeBucketsNeeded)
completelyFreeBucketsNeeded
- number of buckets to freeprivate void freeSpace(String why)
why
- Why we are being calledstatic List<BucketCache.RAMQueueEntry> getRAMQueueEntries(BlockingQueue<BucketCache.RAMQueueEntry> q, List<BucketCache.RAMQueueEntry> receptical) throws InterruptedException
q
then tries to grab as many as possible
before returning.recepticle
- Where to stash the elements taken from queue. We clear before we use it
just in case.q
- The queue to take from.receptical laden with elements taken from the queue or empty if none found.
InterruptedException
private void persistToFile() throws IOException
IOException
private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException, ClassNotFoundException
private void checkIOErrorIsTolerated()
private void disableCache()
private void join() throws InterruptedException
InterruptedException
public void shutdown()
BlockCache
shutdown
in interface BlockCache
public CacheStats getStats()
BlockCache
getStats
in interface BlockCache
public BucketAllocator getAllocator()
public long heapSize()
public long size()
BlockCache
size
in interface BlockCache
public long getFreeSize()
BlockCache
getFreeSize
in interface BlockCache
public long getBlockCount()
BlockCache
getBlockCount
in interface BlockCache
public long getCurrentSize()
BlockCache
getCurrentSize
in interface BlockCache
public int evictBlocksByHfileName(String hfileName)
This is used for evict-on-close to remove all blocks of a specific HFile.
evictBlocksByHfileName
in interface BlockCache
void stopWriterThreads() throws InterruptedException
InterruptedException
public Iterator<CachedBlock> iterator()
iterator
in interface Iterable<CachedBlock>
iterator
in interface BlockCache
public BlockCache[] getBlockCaches()
getBlockCaches
in interface BlockCache
Copyright © 2007–2019 The Apache Software Foundation. All rights reserved.