001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import java.util.Comparator;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.concurrent.locks.ReentrantReadWriteLock;
025import java.util.function.Function;
026import org.apache.hadoop.hbase.io.ByteBuffAllocator;
027import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
028import org.apache.hadoop.hbase.io.hfile.BlockPriority;
029import org.apache.hadoop.hbase.io.hfile.Cacheable;
030import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
031import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
032import org.apache.hadoop.hbase.nio.ByteBuff;
033import org.apache.hadoop.hbase.nio.HBaseReferenceCounted;
034import org.apache.hadoop.hbase.nio.RefCnt;
035import org.apache.hadoop.hbase.util.IdReadWriteLock;
036import org.apache.yetus.audience.InterfaceAudience;
037
038/**
039 * Item in cache. We expect this to be where most memory goes. Java uses 8 bytes just for object
040 * headers; after this, we want to use as little as possible - so we only use 8 bytes, but in order
041 * to do so we end up messing around with all this Java casting stuff. Offset stored as 5 bytes that
042 * make up the long. Doubt we'll see devices this big for ages. Offsets are divided by 256. So 5
043 * bytes gives us 256TB or so.
044 */
045@InterfaceAudience.Private
046class BucketEntry implements HBaseReferenceCounted {
047  // access counter comparator, descending order
048  static final Comparator<BucketEntry> COMPARATOR =
049    Comparator.comparingLong(BucketEntry::getAccessCounter).reversed();
050
051  private int offsetBase;
052  private int length;
053  private byte offset1;
054
055  /**
056   * The index of the deserializer that can deserialize this BucketEntry content. See
057   * {@link CacheableDeserializerIdManager} for hosting of index to serializers.
058   */
059  byte deserializerIndex;
060
061  private volatile long accessCounter;
062  private BlockPriority priority;
063
064  /**
065   * <pre>
066   * The RefCnt means how many paths are referring the {@link BucketEntry}, there are two cases:
067   * 1.If the {@link IOEngine#usesSharedMemory()} is false(eg.{@link FileIOEngine}),the refCnt is
068   *   always 1 until this {@link BucketEntry} is evicted from {@link BucketCache#backingMap}.Even
069   *   if the corresponding {@link HFileBlock} is referenced by RPC reading, the refCnt should not
070   *   increase.
071   *
072   * 2.If the {@link IOEngine#usesSharedMemory()} is true(eg.{@link ByteBufferIOEngine}),each RPC
073   *   reading path is considering as one path, the {@link BucketCache#backingMap} reference is
074   *   also considered a path. NOTICE that if two read RPC path hit the same {@link BucketEntry},
075   *   then the {@link HFileBlock}s the two RPC referred will share the same refCnt instance with
076   *   the {@link BucketEntry},so the refCnt will increase or decrease as the following:
077   *   (1) when writerThread flush the block into IOEngine and add the bucketEntry into backingMap,
078   *       the refCnt ++;
079   *   (2) If BucketCache evict the block and move the bucketEntry out of backingMap, the refCnt--;
080   *       it usually happen when HFile is closing or someone call the clearBucketCache by force.
081   *   (3) The read RPC path start to refer the block which is backend by the memory area in
082   *       bucketEntry, then refCnt ++ ;
083   *   (4) The read RPC patch shipped the response, and release the block. then refCnt--;
084   *    Once the refCnt decrease to zero, then the {@link BucketAllocator} will free the block area.
085   * </pre>
086   */
087  private final RefCnt refCnt;
088  final AtomicBoolean markedAsEvicted;
089  final ByteBuffAllocator allocator;
090
091  /**
092   * Time this block was cached. Presumes we are created just before we are added to the cache.
093   */
094  private final long cachedTime = System.nanoTime();
095
096  /**
097   * @param createRecycler used to free this {@link BucketEntry} when {@link BucketEntry#refCnt}
098   *                       becoming 0. NOTICE that {@link ByteBuffAllocator#NONE} could only be used
099   *                       for test.
100   */
101  BucketEntry(long offset, int length, long accessCounter, boolean inMemory,
102    Function<BucketEntry, Recycler> createRecycler, ByteBuffAllocator allocator) {
103    if (createRecycler == null) {
104      throw new IllegalArgumentException("createRecycler could not be null!");
105    }
106    setOffset(offset);
107    this.length = length;
108    this.accessCounter = accessCounter;
109    this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI;
110    this.refCnt = RefCnt.create(createRecycler.apply(this));
111
112    this.markedAsEvicted = new AtomicBoolean(false);
113    this.allocator = allocator;
114  }
115
116  long offset() {
117    // Java has no unsigned numbers, so this needs the L cast otherwise it will be sign extended
118    // as a negative number.
119    long o = ((long) offsetBase) & 0xFFFFFFFFL;
120    // The 0xFF here does not need the L cast because it is treated as a positive int.
121    o += (((long) (offset1)) & 0xFF) << 32;
122    return o << 8;
123  }
124
125  private void setOffset(long value) {
126    assert (value & 0xFF) == 0;
127    value >>= 8;
128    offsetBase = (int) value;
129    offset1 = (byte) (value >> 32);
130  }
131
132  public int getLength() {
133    return length;
134  }
135
136  CacheableDeserializer<Cacheable> deserializerReference() {
137    return CacheableDeserializerIdManager.getDeserializer(deserializerIndex);
138  }
139
140  void setDeserializerReference(CacheableDeserializer<Cacheable> deserializer) {
141    this.deserializerIndex = (byte) deserializer.getDeserializerIdentifier();
142  }
143
144  long getAccessCounter() {
145    return accessCounter;
146  }
147
148  /**
149   * Block has been accessed. Update its local access counter.
150   */
151  void access(long accessCounter) {
152    this.accessCounter = accessCounter;
153    if (this.priority == BlockPriority.SINGLE) {
154      this.priority = BlockPriority.MULTI;
155    }
156  }
157
158  public BlockPriority getPriority() {
159    return this.priority;
160  }
161
162  long getCachedTime() {
163    return cachedTime;
164  }
165
166  /**
167   * The {@link BucketCache} will try to release its reference to this BucketEntry many times. we
168   * must make sure the idempotent, otherwise it'll decrease the RPC's reference count in advance,
169   * then for RPC memory leak happen.
170   * @return true if we deallocate this entry successfully.
171   */
172  boolean markAsEvicted() {
173    if (markedAsEvicted.compareAndSet(false, true)) {
174      return this.release();
175    }
176    return false;
177  }
178
179  /**
180   * Check whether have some RPC patch referring this block.<br/>
181   * For {@link IOEngine#usesSharedMemory()} is true(eg.{@link ByteBufferIOEngine}), there're two
182   * case: <br>
183   * 1. If current refCnt is greater than 1, there must be at least one referring RPC path; <br>
184   * 2. If current refCnt is equal to 1 and the markedAtEvicted is true, the it means backingMap has
185   * released its reference, the remaining reference can only be from RPC path. <br>
186   * We use this check to decide whether we can free the block area: when cached size exceed the
187   * acceptable size, our eviction policy will choose those stale blocks without any RPC reference
188   * and the RPC referred block will be excluded. <br/>
189   * <br/>
190   * For {@link IOEngine#usesSharedMemory()} is false(eg.{@link FileIOEngine}),
191   * {@link BucketEntry#refCnt} is always 1 until it is evicted from {@link BucketCache#backingMap},
192   * so {@link BucketEntry#isRpcRef()} is always return false.
193   * @return true to indicate there're some RPC referring the block.
194   */
195  boolean isRpcRef() {
196    boolean evicted = markedAsEvicted.get();
197    return this.refCnt() > 1 || (evicted && refCnt() == 1);
198  }
199
200  Cacheable wrapAsCacheable(ByteBuffer[] buffers) throws IOException {
201    return wrapAsCacheable(ByteBuff.wrap(buffers, this.refCnt));
202  }
203
204  Cacheable wrapAsCacheable(ByteBuff buf) throws IOException {
205    return this.deserializerReference().deserialize(buf, allocator);
206  }
207
208  interface BucketEntryHandler<T> {
209    T handle();
210  }
211
212  <T> T withWriteLock(IdReadWriteLock<Long> offsetLock, BucketEntryHandler<T> handler) {
213    ReentrantReadWriteLock lock = offsetLock.getLock(this.offset());
214    try {
215      lock.writeLock().lock();
216      return handler.handle();
217    } finally {
218      lock.writeLock().unlock();
219    }
220  }
221
222  @Override
223  public int refCnt() {
224    return this.refCnt.refCnt();
225  }
226
227  @Override
228  public BucketEntry retain() {
229    refCnt.retain();
230    return this;
231  }
232
233  /**
234   * We've three cases to release refCnt now: <br>
235   * 1. BucketCache#evictBlock, it will release the backingMap's reference by force because we're
236   * closing file or clear the bucket cache or some corruption happen. when all rpc references gone,
237   * then free the area in bucketAllocator. <br>
238   * 2. BucketCache#returnBlock . when rpc shipped, we'll release the block, only when backingMap
239   * also release its refCnt (case.1 will do this) and no other rpc reference, then it will free the
240   * area in bucketAllocator. <br>
241   * 3.evict those block without any rpc reference if cache size exceeded. we'll only free those
242   * blocks with zero rpc reference count, as the {@link BucketEntry#markStaleAsEvicted()} do.
243   * @return true to indicate we've decreased to zero and do the de-allocation.
244   */
245  @Override
246  public boolean release() {
247    return refCnt.release();
248  }
249}