001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one or more 005 * contributor license agreements. See the NOTICE file distributed with this 006 * work for additional information regarding copyright ownership. The ASF 007 * licenses this file to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 016 * License for the specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.hadoop.hbase.io.hfile.bucket; 020 021 022import java.util.Comparator; 023import java.util.Map; 024import java.util.Map.Entry; 025 026import org.apache.yetus.audience.InterfaceAudience; 027import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 028import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry; 029 030import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue; 031 032/** 033 * A memory-bound queue that will grow until an element brings total size larger 034 * than maxSize. From then on, only entries that are sorted larger than the 035 * smallest current entry will be inserted/replaced. 036 * 037 * <p> 038 * Use this when you want to find the largest elements (according to their 039 * ordering, not their heap size) that consume as close to the specified maxSize 040 * as possible. Default behavior is to grow just above rather than just below 041 * specified max. 042 */ 043@InterfaceAudience.Private 044public class CachedEntryQueue { 045 046 private MinMaxPriorityQueue<Map.Entry<BlockCacheKey, BucketEntry>> queue; 047 048 private long cacheSize; 049 private long maxSize; 050 051 /** 052 * @param maxSize the target size of elements in the queue 053 * @param blockSize expected average size of blocks 054 */ 055 public CachedEntryQueue(long maxSize, long blockSize) { 056 int initialSize = (int) (maxSize / blockSize); 057 if (initialSize == 0) { 058 initialSize++; 059 } 060 queue = MinMaxPriorityQueue.orderedBy(new Comparator<Map.Entry<BlockCacheKey, BucketEntry>>() { 061 062 @Override 063 public int compare(Entry<BlockCacheKey, BucketEntry> entry1, 064 Entry<BlockCacheKey, BucketEntry> entry2) { 065 return BucketEntry.COMPARATOR.compare(entry1.getValue(), entry2.getValue()); 066 } 067 068 }).expectedSize(initialSize).create(); 069 cacheSize = 0; 070 this.maxSize = maxSize; 071 } 072 073 /** 074 * Attempt to add the specified entry to this queue. 075 * <p> 076 * If the queue is smaller than the max size, or if the specified element is 077 * ordered after the smallest element in the queue, the element will be added 078 * to the queue. Otherwise, there is no side effect of this call. 079 * @param entry a bucket entry with key to try to add to the queue 080 */ 081 public void add(Map.Entry<BlockCacheKey, BucketEntry> entry) { 082 if (cacheSize < maxSize) { 083 queue.add(entry); 084 cacheSize += entry.getValue().getLength(); 085 } else { 086 BucketEntry head = queue.peek().getValue(); 087 if (BucketEntry.COMPARATOR.compare(entry.getValue(), head) > 0) { 088 cacheSize += entry.getValue().getLength(); 089 cacheSize -= head.getLength(); 090 if (cacheSize > maxSize) { 091 queue.poll(); 092 } else { 093 cacheSize += head.getLength(); 094 } 095 queue.add(entry); 096 } 097 } 098 } 099 100 /** 101 * @return The next element in this queue, or {@code null} if the queue is 102 * empty. 103 */ 104 public Map.Entry<BlockCacheKey, BucketEntry> poll() { 105 return queue.poll(); 106 } 107 108 /** 109 * @return The last element in this queue, or {@code null} if the queue is 110 * empty. 111 */ 112 public Map.Entry<BlockCacheKey, BucketEntry> pollLast() { 113 return queue.pollLast(); 114 } 115 116 /** 117 * Total size of all elements in this queue. 118 * @return size of all elements currently in queue, in bytes 119 */ 120 public long cacheSize() { 121 return cacheSize; 122 } 123}