001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021package org.apache.hadoop.hbase.io.hfile.bucket; 022 023import java.io.IOException; 024import java.nio.ByteBuffer; 025import java.util.Comparator; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.locks.ReentrantReadWriteLock; 028 029import org.apache.hadoop.hbase.io.ByteBuffAllocator; 030import org.apache.hadoop.hbase.io.hfile.BlockPriority; 031import org.apache.hadoop.hbase.io.hfile.Cacheable; 032import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; 033import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; 034import org.apache.hadoop.hbase.nio.ByteBuff; 035import org.apache.hadoop.hbase.nio.HBaseReferenceCounted; 036import org.apache.hadoop.hbase.nio.RefCnt; 037import org.apache.hadoop.hbase.util.IdReadWriteLock; 038import org.apache.yetus.audience.InterfaceAudience; 039 040/** 041 * Item in cache. We expect this to be where most memory goes. Java uses 8 bytes just for object 042 * headers; after this, we want to use as little as possible - so we only use 8 bytes, but in order 043 * to do so we end up messing around with all this Java casting stuff. Offset stored as 5 bytes that 044 * make up the long. Doubt we'll see devices this big for ages. Offsets are divided by 256. So 5 045 * bytes gives us 256TB or so. 046 */ 047@InterfaceAudience.Private 048class BucketEntry implements HBaseReferenceCounted { 049 // access counter comparator, descending order 050 static final Comparator<BucketEntry> COMPARATOR = 051 Comparator.comparingLong(BucketEntry::getAccessCounter).reversed(); 052 053 private int offsetBase; 054 private int length; 055 private byte offset1; 056 057 /** 058 * The index of the deserializer that can deserialize this BucketEntry content. See 059 * {@link CacheableDeserializerIdManager} for hosting of index to serializers. 060 */ 061 byte deserializerIndex; 062 063 private volatile long accessCounter; 064 private BlockPriority priority; 065 066 /** 067 * The RefCnt means how many paths are referring the {@link BucketEntry}, each RPC reading path is 068 * considering as one path, the {@link BucketCache#backingMap} reference is also considered a 069 * path. NOTICE that if two read RPC path hit the same {@link BucketEntry}, then the HFileBlocks 070 * the two RPC referred will share the same refCnt instance with the BucketEntry. so the refCnt 071 * will increase or decrease as the following: <br> 072 * 1. when writerThread flush the block into IOEngine and add the bucketEntry into backingMap, the 073 * refCnt ++; <br> 074 * 2. If BucketCache evict the block and move the bucketEntry out of backingMap, the refCnt--; it 075 * usually happen when HFile is closing or someone call the clearBucketCache by force. <br> 076 * 3. The read RPC path start to refer the block which is backend by the memory area in 077 * bucketEntry, then refCnt ++ ; <br> 078 * 4. The read RPC patch shipped the response, and release the block. then refCnt--; <br> 079 * Once the refCnt decrease to zero, then the {@link BucketAllocator} will free the block area. 080 */ 081 private final RefCnt refCnt; 082 final AtomicBoolean markedAsEvicted; 083 final ByteBuffAllocator allocator; 084 085 /** 086 * Time this block was cached. Presumes we are created just before we are added to the cache. 087 */ 088 private final long cachedTime = System.nanoTime(); 089 090 BucketEntry(long offset, int length, long accessCounter, boolean inMemory) { 091 this(offset, length, accessCounter, inMemory, RefCnt.create(), ByteBuffAllocator.HEAP); 092 } 093 094 BucketEntry(long offset, int length, long accessCounter, boolean inMemory, RefCnt refCnt, 095 ByteBuffAllocator allocator) { 096 setOffset(offset); 097 this.length = length; 098 this.accessCounter = accessCounter; 099 this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI; 100 this.refCnt = refCnt; 101 this.markedAsEvicted = new AtomicBoolean(false); 102 this.allocator = allocator; 103 } 104 105 long offset() { 106 // Java has no unsigned numbers, so this needs the L cast otherwise it will be sign extended 107 // as a negative number. 108 long o = ((long) offsetBase) & 0xFFFFFFFFL; 109 // The 0xFF here does not need the L cast because it is treated as a positive int. 110 o += (((long) (offset1)) & 0xFF) << 32; 111 return o << 8; 112 } 113 114 private void setOffset(long value) { 115 assert (value & 0xFF) == 0; 116 value >>= 8; 117 offsetBase = (int) value; 118 offset1 = (byte) (value >> 32); 119 } 120 121 public int getLength() { 122 return length; 123 } 124 125 CacheableDeserializer<Cacheable> deserializerReference() { 126 return CacheableDeserializerIdManager.getDeserializer(deserializerIndex); 127 } 128 129 void setDeserializerReference(CacheableDeserializer<Cacheable> deserializer) { 130 this.deserializerIndex = (byte) deserializer.getDeserializerIdentifier(); 131 } 132 133 long getAccessCounter() { 134 return accessCounter; 135 } 136 137 /** 138 * Block has been accessed. Update its local access counter. 139 */ 140 void access(long accessCounter) { 141 this.accessCounter = accessCounter; 142 if (this.priority == BlockPriority.SINGLE) { 143 this.priority = BlockPriority.MULTI; 144 } 145 } 146 147 public BlockPriority getPriority() { 148 return this.priority; 149 } 150 151 long getCachedTime() { 152 return cachedTime; 153 } 154 155 /** 156 * The {@link BucketCache} will try to release its reference to this BucketEntry many times. we 157 * must make sure the idempotent, otherwise it'll decrease the RPC's reference count in advance, 158 * then for RPC memory leak happen. 159 * @return true if we deallocate this entry successfully. 160 */ 161 boolean markAsEvicted() { 162 if (markedAsEvicted.compareAndSet(false, true)) { 163 return this.release(); 164 } 165 return false; 166 } 167 168 /** 169 * Mark as evicted only when NO RPC references. Mainly used for eviction when cache size exceed 170 * the max acceptable size. 171 * @return true if we deallocate this entry successfully. 172 */ 173 boolean markStaleAsEvicted() { 174 if (!markedAsEvicted.get() && this.refCnt() == 1) { 175 // The only reference was coming from backingMap, now release the stale entry. 176 return this.markAsEvicted(); 177 } 178 return false; 179 } 180 181 /** 182 * Check whether have some RPC patch referring this block. There're two case: <br> 183 * 1. If current refCnt is greater than 1, there must be at least one referring RPC path; <br> 184 * 2. If current refCnt is equal to 1 and the markedAtEvicted is true, the it means backingMap has 185 * released its reference, the remaining reference can only be from RPC path. <br> 186 * We use this check to decide whether we can free the block area: when cached size exceed the 187 * acceptable size, our eviction policy will choose those stale blocks without any RPC reference 188 * and the RPC referred block will be excluded. 189 * @return true to indicate there're some RPC referring the block. 190 */ 191 boolean isRpcRef() { 192 boolean evicted = markedAsEvicted.get(); 193 return this.refCnt() > 1 || (evicted && refCnt() == 1); 194 } 195 196 Cacheable wrapAsCacheable(ByteBuffer[] buffers) throws IOException { 197 return wrapAsCacheable(ByteBuff.wrap(buffers, this.refCnt)); 198 } 199 200 Cacheable wrapAsCacheable(ByteBuff buf) throws IOException { 201 return this.deserializerReference().deserialize(buf, allocator); 202 } 203 204 interface BucketEntryHandler<T> { 205 T handle(); 206 } 207 208 <T> T withWriteLock(IdReadWriteLock<Long> offsetLock, BucketEntryHandler<T> handler) { 209 ReentrantReadWriteLock lock = offsetLock.getLock(this.offset()); 210 try { 211 lock.writeLock().lock(); 212 return handler.handle(); 213 } finally { 214 lock.writeLock().unlock(); 215 } 216 } 217 218 @Override 219 public int refCnt() { 220 return this.refCnt.refCnt(); 221 } 222 223 @Override 224 public BucketEntry retain() { 225 refCnt.retain(); 226 return this; 227 } 228 229 /** 230 * We've three cases to release refCnt now: <br> 231 * 1. BucketCache#evictBlock, it will release the backingMap's reference by force because we're 232 * closing file or clear the bucket cache or some corruption happen. when all rpc references gone, 233 * then free the area in bucketAllocator. <br> 234 * 2. BucketCache#returnBlock . when rpc shipped, we'll release the block, only when backingMap 235 * also release its refCnt (case.1 will do this) and no other rpc reference, then it will free the 236 * area in bucketAllocator. <br> 237 * 3.evict those block without any rpc reference if cache size exceeded. we'll only free those 238 * blocks with zero rpc reference count, as the {@link BucketEntry#markStaleAsEvicted()} do. 239 * @return true to indicate we've decreased to zero and do the de-allocation. 240 */ 241 @Override 242 public boolean release() { 243 return refCnt.release(); 244 } 245}