001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.util.AbstractQueue; 021import java.util.Collection; 022import java.util.Comparator; 023import java.util.Iterator; 024import java.util.Objects; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.locks.Condition; 028import java.util.concurrent.locks.ReentrantLock; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.apache.yetus.audience.InterfaceStability; 031 032/** 033 * A generic bounded blocking Priority-Queue. The elements of the priority queue are ordered 034 * according to the Comparator provided at queue construction time. If multiple elements have the 035 * same priority this queue orders them in FIFO (first-in-first-out) manner. The head of this queue 036 * is the least element with respect to the specified ordering. If multiple elements are tied for 037 * least value, the head is the first one inserted. The queue retrieval operations poll, remove, 038 * peek, and element access the element at the head of the queue. 039 */ 040@InterfaceAudience.Private 041@InterfaceStability.Stable 042public class BoundedPriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> { 043 private static class PriorityQueue<E> { 044 private final Comparator<? super E> comparator; 045 private final E[] objects; 046 047 private int head = 0; 048 private int tail = 0; 049 050 @SuppressWarnings("unchecked") 051 public PriorityQueue(int capacity, Comparator<? super E> comparator) { 052 this.objects = (E[]) new Object[capacity]; 053 this.comparator = comparator; 054 } 055 056 public void add(E elem) { 057 if (tail == objects.length) { 058 // shift down |-----AAAAAAA| 059 tail -= head; 060 System.arraycopy(objects, head, objects, 0, tail); 061 head = 0; 062 } 063 064 if (tail == head || comparator.compare(objects[tail - 1], elem) <= 0) { 065 // Append 066 objects[tail++] = elem; 067 } else if (head > 0 && comparator.compare(objects[head], elem) > 0) { 068 // Prepend 069 objects[--head] = elem; 070 } else { 071 // Insert in the middle 072 int index = upperBound(head, tail - 1, elem); 073 System.arraycopy(objects, index, objects, index + 1, tail - index); 074 objects[index] = elem; 075 tail++; 076 } 077 } 078 079 public E peek() { 080 return (head != tail) ? objects[head] : null; 081 } 082 083 public E poll() { 084 E elem = objects[head]; 085 objects[head] = null; 086 head = (head + 1) % objects.length; 087 if (head == 0) tail = 0; 088 return elem; 089 } 090 091 public int size() { 092 return tail - head; 093 } 094 095 public Comparator<? super E> comparator() { 096 return this.comparator; 097 } 098 099 public boolean contains(Object o) { 100 for (int i = head; i < tail; ++i) { 101 if (objects[i] == o) { 102 return true; 103 } 104 } 105 return false; 106 } 107 108 public int remainingCapacity() { 109 return this.objects.length - (tail - head); 110 } 111 112 private int upperBound(int start, int end, E key) { 113 while (start < end) { 114 int mid = start + ((end - start) >> 1); 115 E mitem = objects[mid]; 116 int cmp = comparator.compare(mitem, key); 117 if (cmp > 0) { 118 end = mid; 119 } else { 120 start = mid + 1; 121 } 122 } 123 return start; 124 } 125 } 126 127 // Lock used for all operations 128 private final ReentrantLock lock = new ReentrantLock(); 129 130 // Condition for blocking when empty 131 private final Condition notEmpty = lock.newCondition(); 132 133 // Wait queue for waiting puts 134 private final Condition notFull = lock.newCondition(); 135 136 private final PriorityQueue<E> queue; 137 138 /** 139 * Creates a PriorityQueue with the specified capacity that orders its elements according to the 140 * specified comparator. 141 * @param capacity the capacity of this queue 142 * @param comparator the comparator that will be used to order this priority queue 143 */ 144 public BoundedPriorityBlockingQueue(int capacity, Comparator<? super E> comparator) { 145 this.queue = new PriorityQueue<>(capacity, comparator); 146 } 147 148 @Override 149 public boolean offer(E e) { 150 Objects.requireNonNull(e); 151 152 lock.lock(); 153 try { 154 if (queue.remainingCapacity() > 0) { 155 this.queue.add(e); 156 notEmpty.signal(); 157 return true; 158 } 159 } finally { 160 lock.unlock(); 161 } 162 return false; 163 } 164 165 @Override 166 public void put(E e) throws InterruptedException { 167 Objects.requireNonNull(e); 168 169 lock.lock(); 170 try { 171 while (queue.remainingCapacity() == 0) { 172 notFull.await(); 173 } 174 this.queue.add(e); 175 notEmpty.signal(); 176 } finally { 177 lock.unlock(); 178 } 179 } 180 181 @Override 182 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { 183 Objects.requireNonNull(e); 184 long nanos = unit.toNanos(timeout); 185 186 lock.lockInterruptibly(); 187 try { 188 while (queue.remainingCapacity() == 0) { 189 if (nanos <= 0) return false; 190 nanos = notFull.awaitNanos(nanos); 191 } 192 this.queue.add(e); 193 notEmpty.signal(); 194 } finally { 195 lock.unlock(); 196 } 197 return true; 198 } 199 200 @Override 201 public E take() throws InterruptedException { 202 E result = null; 203 lock.lockInterruptibly(); 204 try { 205 while (queue.size() == 0) { 206 notEmpty.await(); 207 } 208 result = queue.poll(); 209 notFull.signal(); 210 } finally { 211 lock.unlock(); 212 } 213 return result; 214 } 215 216 @Override 217 public E poll() { 218 E result = null; 219 lock.lock(); 220 try { 221 if (queue.size() > 0) { 222 result = queue.poll(); 223 notFull.signal(); 224 } 225 } finally { 226 lock.unlock(); 227 } 228 return result; 229 } 230 231 @Override 232 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 233 long nanos = unit.toNanos(timeout); 234 lock.lockInterruptibly(); 235 E result = null; 236 try { 237 while (queue.size() == 0 && nanos > 0) { 238 nanos = notEmpty.awaitNanos(nanos); 239 } 240 if (queue.size() > 0) { 241 result = queue.poll(); 242 } 243 notFull.signal(); 244 } finally { 245 lock.unlock(); 246 } 247 return result; 248 } 249 250 @Override 251 public E peek() { 252 lock.lock(); 253 try { 254 return queue.peek(); 255 } finally { 256 lock.unlock(); 257 } 258 } 259 260 @Override 261 public int size() { 262 lock.lock(); 263 try { 264 return queue.size(); 265 } finally { 266 lock.unlock(); 267 } 268 } 269 270 @Override 271 public Iterator<E> iterator() { 272 throw new UnsupportedOperationException(); 273 } 274 275 public Comparator<? super E> comparator() { 276 return queue.comparator(); 277 } 278 279 @Override 280 public int remainingCapacity() { 281 lock.lock(); 282 try { 283 return queue.remainingCapacity(); 284 } finally { 285 lock.unlock(); 286 } 287 } 288 289 @Override 290 public boolean remove(Object o) { 291 throw new UnsupportedOperationException(); 292 } 293 294 @Override 295 public boolean contains(Object o) { 296 lock.lock(); 297 try { 298 return queue.contains(o); 299 } finally { 300 lock.unlock(); 301 } 302 } 303 304 @Override 305 public int drainTo(Collection<? super E> c) { 306 return drainTo(c, Integer.MAX_VALUE); 307 } 308 309 @Override 310 public int drainTo(Collection<? super E> c, int maxElements) { 311 Objects.requireNonNull(c); 312 if (c == this) throw new IllegalArgumentException(); 313 if (maxElements <= 0) return 0; 314 lock.lock(); 315 try { 316 int n = Math.min(queue.size(), maxElements); 317 for (int i = 0; i < n; ++i) { 318 c.add(queue.poll()); 319 } 320 return n; 321 } finally { 322 lock.unlock(); 323 } 324 } 325}