001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.io.hfile; 020 021import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue; 022 023import org.apache.yetus.audience.InterfaceAudience; 024import org.apache.hadoop.hbase.io.HeapSize; 025 026/** 027 * A memory-bound queue that will grow until an element brings 028 * total size >= maxSize. From then on, only entries that are sorted larger 029 * than the smallest current entry will be inserted/replaced. 030 * 031 * <p>Use this when you want to find the largest elements (according to their 032 * ordering, not their heap size) that consume as close to the specified 033 * maxSize as possible. Default behavior is to grow just above rather than 034 * just below specified max. 035 * 036 * <p>Object used in this queue must implement {@link HeapSize} as well as 037 * {@link Comparable}. 038 */ 039@InterfaceAudience.Private 040public class LruCachedBlockQueue implements HeapSize { 041 042 private MinMaxPriorityQueue<LruCachedBlock> queue; 043 044 private long heapSize; 045 private long maxSize; 046 047 /** 048 * @param maxSize the target size of elements in the queue 049 * @param blockSize expected average size of blocks 050 */ 051 public LruCachedBlockQueue(long maxSize, long blockSize) { 052 int initialSize = (int)(maxSize / blockSize); 053 if(initialSize == 0) initialSize++; 054 queue = MinMaxPriorityQueue.expectedSize(initialSize).create(); 055 heapSize = 0; 056 this.maxSize = maxSize; 057 } 058 059 /** 060 * Attempt to add the specified cached block to this queue. 061 * 062 * <p>If the queue is smaller than the max size, or if the specified element 063 * is ordered before the smallest element in the queue, the element will be 064 * added to the queue. Otherwise, there is no side effect of this call. 065 * @param cb block to try to add to the queue 066 */ 067 public void add(LruCachedBlock cb) { 068 if(heapSize < maxSize) { 069 queue.add(cb); 070 heapSize += cb.heapSize(); 071 } else { 072 LruCachedBlock head = queue.peek(); 073 if(cb.compareTo(head) > 0) { 074 heapSize += cb.heapSize(); 075 heapSize -= head.heapSize(); 076 if(heapSize > maxSize) { 077 queue.poll(); 078 } else { 079 heapSize += head.heapSize(); 080 } 081 queue.add(cb); 082 } 083 } 084 } 085 086 /** 087 * @return The next element in this queue, or {@code null} if the queue is 088 * empty. 089 */ 090 public LruCachedBlock poll() { 091 return queue.poll(); 092 } 093 094 /** 095 * @return The last element in this queue, or {@code null} if the queue is 096 * empty. 097 */ 098 public LruCachedBlock pollLast() { 099 return queue.pollLast(); 100 } 101 102 /** 103 * Total size of all elements in this queue. 104 * @return size of all elements currently in queue, in bytes 105 */ 106 @Override 107 public long heapSize() { 108 return heapSize; 109 } 110}