001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import java.util.Comparator; 021import java.util.Map; 022import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 023import org.apache.yetus.audience.InterfaceAudience; 024 025import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 026import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue; 027 028/** 029 * A memory-bound queue that will grow until an element brings total size larger than maxSize. From 030 * then on, only entries that are sorted larger than the smallest current entry will be 031 * inserted/replaced. 032 * <p> 033 * Use this when you want to find the largest elements (according to their ordering, not their heap 034 * size) that consume as close to the specified maxSize as possible. Default behavior is to grow 035 * just above rather than just below specified max. 036 */ 037@InterfaceAudience.Private 038public class CachedEntryQueue { 039 040 private static final Comparator<Map.Entry<BlockCacheKey, BucketEntry>> COMPARATOR = 041 (a, b) -> BucketEntry.COMPARATOR.compare(a.getValue(), b.getValue()); 042 043 private MinMaxPriorityQueue<Map.Entry<BlockCacheKey, BucketEntry>> queue; 044 045 private long cacheSize; 046 private long maxSize; 047 048 /** 049 * @param maxSize the target size of elements in the queue 050 * @param blockSize expected average size of blocks 051 */ 052 public CachedEntryQueue(long maxSize, long blockSize) { 053 Preconditions.checkArgument(blockSize > 0, "negative blockSize %s", blockSize); 054 Preconditions.checkArgument(maxSize > 0, "negative maxSize %s", maxSize); 055 int initialSize = (int) (maxSize / blockSize); 056 if (initialSize == 0) { 057 initialSize++; 058 } 059 queue = MinMaxPriorityQueue.orderedBy(COMPARATOR).expectedSize(initialSize).create(); 060 cacheSize = 0; 061 this.maxSize = maxSize; 062 } 063 064 /** 065 * Attempt to add the specified entry to this queue. 066 * <p> 067 * If the queue is smaller than the max size, or if the specified element is ordered after the 068 * smallest element in the queue, the element will be added to the queue. Otherwise, there is no 069 * side effect of this call. 070 * @param entry a bucket entry with key to try to add to the queue 071 */ 072 @edu.umd.cs.findbugs.annotations.SuppressWarnings( 073 value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE", 074 justification = "head can not be null as cacheSize is greater than maxSize," 075 + " which means we have something in the queue") 076 public void add(Map.Entry<BlockCacheKey, BucketEntry> entry) { 077 if (cacheSize < maxSize) { 078 queue.add(entry); 079 cacheSize += entry.getValue().getLength(); 080 } else { 081 BucketEntry head = queue.peek().getValue(); 082 if (BucketEntry.COMPARATOR.compare(entry.getValue(), head) > 0) { 083 cacheSize += entry.getValue().getLength(); 084 cacheSize -= head.getLength(); 085 if (cacheSize > maxSize) { 086 queue.poll(); 087 } else { 088 cacheSize += head.getLength(); 089 } 090 queue.add(entry); 091 } 092 } 093 } 094 095 /** Returns The next element in this queue, or {@code null} if the queue is empty. */ 096 public Map.Entry<BlockCacheKey, BucketEntry> poll() { 097 return queue.poll(); 098 } 099 100 /** Returns The last element in this queue, or {@code null} if the queue is empty. */ 101 public Map.Entry<BlockCacheKey, BucketEntry> pollLast() { 102 return queue.pollLast(); 103 } 104}