001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.util.AbstractQueue; 021import java.util.Collection; 022import java.util.Comparator; 023import java.util.Iterator; 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.locks.Condition; 027import java.util.concurrent.locks.ReentrantLock; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.apache.yetus.audience.InterfaceStability; 030 031/** 032 * A generic bounded blocking Priority-Queue. The elements of the priority queue are ordered 033 * according to the Comparator provided at queue construction time. If multiple elements have the 034 * same priority this queue orders them in FIFO (first-in-first-out) manner. The head of this queue 035 * is the least element with respect to the specified ordering. If multiple elements are tied for 036 * least value, the head is the first one inserted. The queue retrieval operations poll, remove, 037 * peek, and element access the element at the head of the queue. 038 */ 039@InterfaceAudience.Private 040@InterfaceStability.Stable 041public class BoundedPriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> { 042 private static class PriorityQueue<E> { 043 private final Comparator<? super E> comparator; 044 private final E[] objects; 045 046 private int head = 0; 047 private int tail = 0; 048 049 @SuppressWarnings("unchecked") 050 public PriorityQueue(int capacity, Comparator<? super E> comparator) { 051 this.objects = (E[]) new Object[capacity]; 052 this.comparator = comparator; 053 } 054 055 public void add(E elem) { 056 if (tail == objects.length) { 057 // shift down |-----AAAAAAA| 058 tail -= head; 059 System.arraycopy(objects, head, objects, 0, tail); 060 head = 0; 061 } 062 063 if (tail == head || comparator.compare(objects[tail - 1], elem) <= 0) { 064 // Append 065 objects[tail++] = elem; 066 } else if (head > 0 && comparator.compare(objects[head], elem) > 0) { 067 // Prepend 068 objects[--head] = elem; 069 } else { 070 // Insert in the middle 071 int index = upperBound(head, tail - 1, elem); 072 System.arraycopy(objects, index, objects, index + 1, tail - index); 073 objects[index] = elem; 074 tail++; 075 } 076 } 077 078 public E peek() { 079 return (head != tail) ? objects[head] : null; 080 } 081 082 public E poll() { 083 E elem = objects[head]; 084 objects[head] = null; 085 head = (head + 1) % objects.length; 086 if (head == 0) tail = 0; 087 return elem; 088 } 089 090 public int size() { 091 return tail - head; 092 } 093 094 public Comparator<? super E> comparator() { 095 return this.comparator; 096 } 097 098 public boolean contains(Object o) { 099 for (int i = head; i < tail; ++i) { 100 if (objects[i] == o) { 101 return true; 102 } 103 } 104 return false; 105 } 106 107 public int remainingCapacity() { 108 return this.objects.length - (tail - head); 109 } 110 111 private int upperBound(int start, int end, E key) { 112 while (start < end) { 113 int mid = start + ((end - start) >> 1); 114 E mitem = objects[mid]; 115 int cmp = comparator.compare(mitem, key); 116 if (cmp > 0) { 117 end = mid; 118 } else { 119 start = mid + 1; 120 } 121 } 122 return start; 123 } 124 } 125 126 // Lock used for all operations 127 private final ReentrantLock lock = new ReentrantLock(); 128 129 // Condition for blocking when empty 130 private final Condition notEmpty = lock.newCondition(); 131 132 // Wait queue for waiting puts 133 private final Condition notFull = lock.newCondition(); 134 135 private final PriorityQueue<E> queue; 136 137 /** 138 * Creates a PriorityQueue with the specified capacity that orders its elements according to the 139 * specified comparator. 140 * @param capacity the capacity of this queue 141 * @param comparator the comparator that will be used to order this priority queue 142 */ 143 public BoundedPriorityBlockingQueue(int capacity, Comparator<? super E> comparator) { 144 this.queue = new PriorityQueue<>(capacity, comparator); 145 } 146 147 @Override 148 public boolean offer(E e) { 149 if (e == null) throw new NullPointerException(); 150 151 lock.lock(); 152 try { 153 if (queue.remainingCapacity() > 0) { 154 this.queue.add(e); 155 notEmpty.signal(); 156 return true; 157 } 158 } finally { 159 lock.unlock(); 160 } 161 return false; 162 } 163 164 @Override 165 public void put(E e) throws InterruptedException { 166 if (e == null) throw new NullPointerException(); 167 168 lock.lock(); 169 try { 170 while (queue.remainingCapacity() == 0) { 171 notFull.await(); 172 } 173 this.queue.add(e); 174 notEmpty.signal(); 175 } finally { 176 lock.unlock(); 177 } 178 } 179 180 @Override 181 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { 182 if (e == null) throw new NullPointerException(); 183 long nanos = unit.toNanos(timeout); 184 185 lock.lockInterruptibly(); 186 try { 187 while (queue.remainingCapacity() == 0) { 188 if (nanos <= 0) return false; 189 nanos = notFull.awaitNanos(nanos); 190 } 191 this.queue.add(e); 192 notEmpty.signal(); 193 } finally { 194 lock.unlock(); 195 } 196 return true; 197 } 198 199 @Override 200 public E take() throws InterruptedException { 201 E result = null; 202 lock.lockInterruptibly(); 203 try { 204 while (queue.size() == 0) { 205 notEmpty.await(); 206 } 207 result = queue.poll(); 208 notFull.signal(); 209 } finally { 210 lock.unlock(); 211 } 212 return result; 213 } 214 215 @Override 216 public E poll() { 217 E result = null; 218 lock.lock(); 219 try { 220 if (queue.size() > 0) { 221 result = queue.poll(); 222 notFull.signal(); 223 } 224 } finally { 225 lock.unlock(); 226 } 227 return result; 228 } 229 230 @Override 231 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 232 long nanos = unit.toNanos(timeout); 233 lock.lockInterruptibly(); 234 E result = null; 235 try { 236 while (queue.size() == 0 && nanos > 0) { 237 nanos = notEmpty.awaitNanos(nanos); 238 } 239 if (queue.size() > 0) { 240 result = queue.poll(); 241 } 242 notFull.signal(); 243 } finally { 244 lock.unlock(); 245 } 246 return result; 247 } 248 249 @Override 250 public E peek() { 251 lock.lock(); 252 try { 253 return queue.peek(); 254 } finally { 255 lock.unlock(); 256 } 257 } 258 259 @Override 260 public int size() { 261 lock.lock(); 262 try { 263 return queue.size(); 264 } finally { 265 lock.unlock(); 266 } 267 } 268 269 @Override 270 public Iterator<E> iterator() { 271 throw new UnsupportedOperationException(); 272 } 273 274 public Comparator<? super E> comparator() { 275 return queue.comparator(); 276 } 277 278 @Override 279 public int remainingCapacity() { 280 lock.lock(); 281 try { 282 return queue.remainingCapacity(); 283 } finally { 284 lock.unlock(); 285 } 286 } 287 288 @Override 289 public boolean remove(Object o) { 290 throw new UnsupportedOperationException(); 291 } 292 293 @Override 294 public boolean contains(Object o) { 295 lock.lock(); 296 try { 297 return queue.contains(o); 298 } finally { 299 lock.unlock(); 300 } 301 } 302 303 @Override 304 public int drainTo(Collection<? super E> c) { 305 return drainTo(c, Integer.MAX_VALUE); 306 } 307 308 @Override 309 public int drainTo(Collection<? super E> c, int maxElements) { 310 if (c == null) throw new NullPointerException(); 311 if (c == this) throw new IllegalArgumentException(); 312 if (maxElements <= 0) return 0; 313 lock.lock(); 314 try { 315 int n = Math.min(queue.size(), maxElements); 316 for (int i = 0; i < n; ++i) { 317 c.add(queue.poll()); 318 } 319 return n; 320 } finally { 321 lock.unlock(); 322 } 323 } 324}