001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.util; 020 021import java.util.concurrent.locks.Condition; 022import java.util.concurrent.locks.ReentrantLock; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.TimeUnit; 025import java.util.Collection; 026import java.util.Comparator; 027import java.util.Iterator; 028import java.util.AbstractQueue; 029 030import org.apache.yetus.audience.InterfaceAudience; 031import org.apache.yetus.audience.InterfaceStability; 032 033 034/** 035 * A generic bounded blocking Priority-Queue. 036 * 037 * The elements of the priority queue are ordered according to the Comparator 038 * provided at queue construction time. 039 * 040 * If multiple elements have the same priority this queue orders them in 041 * FIFO (first-in-first-out) manner. 042 * The head of this queue is the least element with respect to the specified 043 * ordering. If multiple elements are tied for least value, the head is the 044 * first one inserted. 045 * The queue retrieval operations poll, remove, peek, and element access the 046 * element at the head of the queue. 047 */ 048@InterfaceAudience.Private 049@InterfaceStability.Stable 050public class BoundedPriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> { 051 private static class PriorityQueue<E> { 052 private final Comparator<? super E> comparator; 053 private final E[] objects; 054 055 private int head = 0; 056 private int tail = 0; 057 058 @SuppressWarnings("unchecked") 059 public PriorityQueue(int capacity, Comparator<? super E> comparator) { 060 this.objects = (E[])new Object[capacity]; 061 this.comparator = comparator; 062 } 063 064 public void add(E elem) { 065 if (tail == objects.length) { 066 // shift down |-----AAAAAAA| 067 tail -= head; 068 System.arraycopy(objects, head, objects, 0, tail); 069 head = 0; 070 } 071 072 if (tail == head || comparator.compare(objects[tail - 1], elem) <= 0) { 073 // Append 074 objects[tail++] = elem; 075 } else if (head > 0 && comparator.compare(objects[head], elem) > 0) { 076 // Prepend 077 objects[--head] = elem; 078 } else { 079 // Insert in the middle 080 int index = upperBound(head, tail - 1, elem); 081 System.arraycopy(objects, index, objects, index + 1, tail - index); 082 objects[index] = elem; 083 tail++; 084 } 085 } 086 087 public E peek() { 088 return (head != tail) ? objects[head] : null; 089 } 090 091 public E poll() { 092 E elem = objects[head]; 093 objects[head] = null; 094 head = (head + 1) % objects.length; 095 if (head == 0) tail = 0; 096 return elem; 097 } 098 099 public int size() { 100 return tail - head; 101 } 102 103 public Comparator<? super E> comparator() { 104 return this.comparator; 105 } 106 107 public boolean contains(Object o) { 108 for (int i = head; i < tail; ++i) { 109 if (objects[i] == o) { 110 return true; 111 } 112 } 113 return false; 114 } 115 116 public int remainingCapacity() { 117 return this.objects.length - (tail - head); 118 } 119 120 private int upperBound(int start, int end, E key) { 121 while (start < end) { 122 int mid = (start + end) >>> 1; 123 E mitem = objects[mid]; 124 int cmp = comparator.compare(mitem, key); 125 if (cmp > 0) { 126 end = mid; 127 } else { 128 start = mid + 1; 129 } 130 } 131 return start; 132 } 133 } 134 135 136 // Lock used for all operations 137 private final ReentrantLock lock = new ReentrantLock(); 138 139 // Condition for blocking when empty 140 private final Condition notEmpty = lock.newCondition(); 141 142 // Wait queue for waiting puts 143 private final Condition notFull = lock.newCondition(); 144 145 private final PriorityQueue<E> queue; 146 147 /** 148 * Creates a PriorityQueue with the specified capacity that orders its 149 * elements according to the specified comparator. 150 * @param capacity the capacity of this queue 151 * @param comparator the comparator that will be used to order this priority queue 152 */ 153 public BoundedPriorityBlockingQueue(int capacity, 154 Comparator<? super E> comparator) { 155 this.queue = new PriorityQueue<>(capacity, comparator); 156 } 157 158 @Override 159 public boolean offer(E e) { 160 if (e == null) throw new NullPointerException(); 161 162 lock.lock(); 163 try { 164 if (queue.remainingCapacity() > 0) { 165 this.queue.add(e); 166 notEmpty.signal(); 167 return true; 168 } 169 } finally { 170 lock.unlock(); 171 } 172 return false; 173 } 174 175 @Override 176 public void put(E e) throws InterruptedException { 177 if (e == null) throw new NullPointerException(); 178 179 lock.lock(); 180 try { 181 while (queue.remainingCapacity() == 0) { 182 notFull.await(); 183 } 184 this.queue.add(e); 185 notEmpty.signal(); 186 } finally { 187 lock.unlock(); 188 } 189 } 190 191 @Override 192 public boolean offer(E e, long timeout, TimeUnit unit) 193 throws InterruptedException { 194 if (e == null) throw new NullPointerException(); 195 long nanos = unit.toNanos(timeout); 196 197 lock.lockInterruptibly(); 198 try { 199 while (queue.remainingCapacity() == 0) { 200 if (nanos <= 0) 201 return false; 202 nanos = notFull.awaitNanos(nanos); 203 } 204 this.queue.add(e); 205 notEmpty.signal(); 206 } finally { 207 lock.unlock(); 208 } 209 return true; 210 } 211 212 @Override 213 public E take() throws InterruptedException { 214 E result = null; 215 lock.lockInterruptibly(); 216 try { 217 while (queue.size() == 0) { 218 notEmpty.await(); 219 } 220 result = queue.poll(); 221 notFull.signal(); 222 } finally { 223 lock.unlock(); 224 } 225 return result; 226 } 227 228 @Override 229 public E poll() { 230 E result = null; 231 lock.lock(); 232 try { 233 if (queue.size() > 0) { 234 result = queue.poll(); 235 notFull.signal(); 236 } 237 } finally { 238 lock.unlock(); 239 } 240 return result; 241 } 242 243 @Override 244 public E poll(long timeout, TimeUnit unit) 245 throws InterruptedException { 246 long nanos = unit.toNanos(timeout); 247 lock.lockInterruptibly(); 248 E result = null; 249 try { 250 while (queue.size() == 0 && nanos > 0) { 251 nanos = notEmpty.awaitNanos(nanos); 252 } 253 if (queue.size() > 0) { 254 result = queue.poll(); 255 } 256 notFull.signal(); 257 } finally { 258 lock.unlock(); 259 } 260 return result; 261 } 262 263 @Override 264 public E peek() { 265 lock.lock(); 266 try { 267 return queue.peek(); 268 } finally { 269 lock.unlock(); 270 } 271 } 272 273 @Override 274 public int size() { 275 lock.lock(); 276 try { 277 return queue.size(); 278 } finally { 279 lock.unlock(); 280 } 281 } 282 283 @Override 284 public Iterator<E> iterator() { 285 throw new UnsupportedOperationException(); 286 } 287 288 public Comparator<? super E> comparator() { 289 return queue.comparator(); 290 } 291 292 @Override 293 public int remainingCapacity() { 294 lock.lock(); 295 try { 296 return queue.remainingCapacity(); 297 } finally { 298 lock.unlock(); 299 } 300 } 301 302 @Override 303 public boolean remove(Object o) { 304 throw new UnsupportedOperationException(); 305 } 306 307 @Override 308 public boolean contains(Object o) { 309 lock.lock(); 310 try { 311 return queue.contains(o); 312 } finally { 313 lock.unlock(); 314 } 315 } 316 317 @Override 318 public int drainTo(Collection<? super E> c) { 319 return drainTo(c, Integer.MAX_VALUE); 320 } 321 322 @Override 323 public int drainTo(Collection<? super E> c, int maxElements) { 324 if (c == null) 325 throw new NullPointerException(); 326 if (c == this) 327 throw new IllegalArgumentException(); 328 if (maxElements <= 0) 329 return 0; 330 lock.lock(); 331 try { 332 int n = Math.min(queue.size(), maxElements); 333 for (int i = 0; i < n; ++i) { 334 c.add(queue.poll()); 335 } 336 return n; 337 } finally { 338 lock.unlock(); 339 } 340 } 341}