001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021package org.apache.hadoop.hbase.io.hfile.bucket;
022
023import java.io.IOException;
024import java.nio.ByteBuffer;
025import java.util.Comparator;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.locks.ReentrantReadWriteLock;
028
029import org.apache.hadoop.hbase.io.ByteBuffAllocator;
030import org.apache.hadoop.hbase.io.hfile.BlockPriority;
031import org.apache.hadoop.hbase.io.hfile.Cacheable;
032import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
033import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
034import org.apache.hadoop.hbase.nio.ByteBuff;
035import org.apache.hadoop.hbase.nio.HBaseReferenceCounted;
036import org.apache.hadoop.hbase.nio.RefCnt;
037import org.apache.hadoop.hbase.util.IdReadWriteLock;
038import org.apache.yetus.audience.InterfaceAudience;
039
040/**
041 * Item in cache. We expect this to be where most memory goes. Java uses 8 bytes just for object
042 * headers; after this, we want to use as little as possible - so we only use 8 bytes, but in order
043 * to do so we end up messing around with all this Java casting stuff. Offset stored as 5 bytes that
044 * make up the long. Doubt we'll see devices this big for ages. Offsets are divided by 256. So 5
045 * bytes gives us 256TB or so.
046 */
047@InterfaceAudience.Private
048class BucketEntry implements HBaseReferenceCounted {
049  // access counter comparator, descending order
050  static final Comparator<BucketEntry> COMPARATOR =
051      Comparator.comparingLong(BucketEntry::getAccessCounter).reversed();
052
053  private int offsetBase;
054  private int length;
055  private byte offset1;
056
057  /**
058   * The index of the deserializer that can deserialize this BucketEntry content. See
059   * {@link CacheableDeserializerIdManager} for hosting of index to serializers.
060   */
061  byte deserializerIndex;
062
063  private volatile long accessCounter;
064  private BlockPriority priority;
065
066  /**
067   * The RefCnt means how many paths are referring the {@link BucketEntry}, each RPC reading path is
068   * considering as one path, the {@link BucketCache#backingMap} reference is also considered a
069   * path. NOTICE that if two read RPC path hit the same {@link BucketEntry}, then the HFileBlocks
070   * the two RPC referred will share the same refCnt instance with the BucketEntry. so the refCnt
071   * will increase or decrease as the following: <br>
072   * 1. when writerThread flush the block into IOEngine and add the bucketEntry into backingMap, the
073   * refCnt ++; <br>
074   * 2. If BucketCache evict the block and move the bucketEntry out of backingMap, the refCnt--; it
075   * usually happen when HFile is closing or someone call the clearBucketCache by force. <br>
076   * 3. The read RPC path start to refer the block which is backend by the memory area in
077   * bucketEntry, then refCnt ++ ; <br>
078   * 4. The read RPC patch shipped the response, and release the block. then refCnt--; <br>
079   * Once the refCnt decrease to zero, then the {@link BucketAllocator} will free the block area.
080   */
081  private final RefCnt refCnt;
082  final AtomicBoolean markedAsEvicted;
083  final ByteBuffAllocator allocator;
084
085  /**
086   * Time this block was cached. Presumes we are created just before we are added to the cache.
087   */
088  private final long cachedTime = System.nanoTime();
089
090  BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
091    this(offset, length, accessCounter, inMemory, RefCnt.create(), ByteBuffAllocator.HEAP);
092  }
093
094  BucketEntry(long offset, int length, long accessCounter, boolean inMemory, RefCnt refCnt,
095      ByteBuffAllocator allocator) {
096    setOffset(offset);
097    this.length = length;
098    this.accessCounter = accessCounter;
099    this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI;
100    this.refCnt = refCnt;
101    this.markedAsEvicted = new AtomicBoolean(false);
102    this.allocator = allocator;
103  }
104
105  long offset() {
106    // Java has no unsigned numbers, so this needs the L cast otherwise it will be sign extended
107    // as a negative number.
108    long o = ((long) offsetBase) & 0xFFFFFFFFL;
109    // The 0xFF here does not need the L cast because it is treated as a positive int.
110    o += (((long) (offset1)) & 0xFF) << 32;
111    return o << 8;
112  }
113
114  private void setOffset(long value) {
115    assert (value & 0xFF) == 0;
116    value >>= 8;
117    offsetBase = (int) value;
118    offset1 = (byte) (value >> 32);
119  }
120
121  public int getLength() {
122    return length;
123  }
124
125  CacheableDeserializer<Cacheable> deserializerReference() {
126    return CacheableDeserializerIdManager.getDeserializer(deserializerIndex);
127  }
128
129  void setDeserializerReference(CacheableDeserializer<Cacheable> deserializer) {
130    this.deserializerIndex = (byte) deserializer.getDeserializerIdentifier();
131  }
132
133  long getAccessCounter() {
134    return accessCounter;
135  }
136
137  /**
138   * Block has been accessed. Update its local access counter.
139   */
140  void access(long accessCounter) {
141    this.accessCounter = accessCounter;
142    if (this.priority == BlockPriority.SINGLE) {
143      this.priority = BlockPriority.MULTI;
144    }
145  }
146
147  public BlockPriority getPriority() {
148    return this.priority;
149  }
150
151  long getCachedTime() {
152    return cachedTime;
153  }
154
155  /**
156   * The {@link BucketCache} will try to release its reference to this BucketEntry many times. we
157   * must make sure the idempotent, otherwise it'll decrease the RPC's reference count in advance,
158   * then for RPC memory leak happen.
159   * @return true if we deallocate this entry successfully.
160   */
161  boolean markAsEvicted() {
162    if (markedAsEvicted.compareAndSet(false, true)) {
163      return this.release();
164    }
165    return false;
166  }
167
168  /**
169   * Mark as evicted only when NO RPC references. Mainly used for eviction when cache size exceed
170   * the max acceptable size.
171   * @return true if we deallocate this entry successfully.
172   */
173  boolean markStaleAsEvicted() {
174    if (!markedAsEvicted.get() && this.refCnt() == 1) {
175      // The only reference was coming from backingMap, now release the stale entry.
176      return this.markAsEvicted();
177    }
178    return false;
179  }
180
181  /**
182   * Check whether have some RPC patch referring this block. There're two case: <br>
183   * 1. If current refCnt is greater than 1, there must be at least one referring RPC path; <br>
184   * 2. If current refCnt is equal to 1 and the markedAtEvicted is true, the it means backingMap has
185   * released its reference, the remaining reference can only be from RPC path. <br>
186   * We use this check to decide whether we can free the block area: when cached size exceed the
187   * acceptable size, our eviction policy will choose those stale blocks without any RPC reference
188   * and the RPC referred block will be excluded.
189   * @return true to indicate there're some RPC referring the block.
190   */
191  boolean isRpcRef() {
192    boolean evicted = markedAsEvicted.get();
193    return this.refCnt() > 1 || (evicted && refCnt() == 1);
194  }
195
196  Cacheable wrapAsCacheable(ByteBuffer[] buffers) throws IOException {
197    return wrapAsCacheable(ByteBuff.wrap(buffers, this.refCnt));
198  }
199
200  Cacheable wrapAsCacheable(ByteBuff buf) throws IOException {
201    return this.deserializerReference().deserialize(buf, allocator);
202  }
203
204  interface BucketEntryHandler<T> {
205    T handle();
206  }
207
208  <T> T withWriteLock(IdReadWriteLock<Long> offsetLock, BucketEntryHandler<T> handler) {
209    ReentrantReadWriteLock lock = offsetLock.getLock(this.offset());
210    try {
211      lock.writeLock().lock();
212      return handler.handle();
213    } finally {
214      lock.writeLock().unlock();
215    }
216  }
217
218  @Override
219  public int refCnt() {
220    return this.refCnt.refCnt();
221  }
222
223  @Override
224  public BucketEntry retain() {
225    refCnt.retain();
226    return this;
227  }
228
229  /**
230   * We've three cases to release refCnt now: <br>
231   * 1. BucketCache#evictBlock, it will release the backingMap's reference by force because we're
232   * closing file or clear the bucket cache or some corruption happen. when all rpc references gone,
233   * then free the area in bucketAllocator. <br>
234   * 2. BucketCache#returnBlock . when rpc shipped, we'll release the block, only when backingMap
235   * also release its refCnt (case.1 will do this) and no other rpc reference, then it will free the
236   * area in bucketAllocator. <br>
237   * 3.evict those block without any rpc reference if cache size exceeded. we'll only free those
238   * blocks with zero rpc reference count, as the {@link BucketEntry#markStaleAsEvicted()} do.
239   * @return true to indicate we've decreased to zero and do the de-allocation.
240   */
241  @Override
242  public boolean release() {
243    return refCnt.release();
244  }
245}