View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.util;
20  
21  import java.util.concurrent.atomic.AtomicInteger;
22  import java.util.concurrent.locks.Condition;
23  import java.util.concurrent.locks.ReentrantLock;
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.TimeUnit;
26  import java.util.Collection;
27  import java.util.Comparator;
28  import java.util.Iterator;
29  import java.util.NoSuchElementException;
30  import java.util.AbstractQueue;
31  
32  import org.apache.hadoop.classification.InterfaceAudience;
33  import org.apache.hadoop.classification.InterfaceStability;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  
38  /**
39   * A generic bounded blocking Priority-Queue.
40   *
41   * The elements of the priority queue are ordered according to the Comparator
42   * provided at queue construction time.
43   *
44   * If multiple elements have the same priority this queue orders them in
45   * FIFO (first-in-first-out) manner.
46   * The head of this queue is the least element with respect to the specified
47   * ordering. If multiple elements are tied for least value, the head is the
48   * first one inserted.
49   * The queue retrieval operations poll, remove, peek, and element access the
50   * element at the head of the queue.
51   */
52  @InterfaceAudience.Private
53  @InterfaceStability.Stable
54  public class BoundedPriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
55    private static class PriorityQueue<E> {
56      private final Comparator<? super E> comparator;
57      private final E[] objects;
58  
59      private int head = 0;
60      private int tail = 0;
61  
62      @SuppressWarnings("unchecked")
63      public PriorityQueue(int capacity, Comparator<? super E> comparator) {
64        this.objects = (E[])new Object[capacity];
65        this.comparator = comparator;
66      }
67  
68      public void add(E elem) {
69        if (tail == objects.length) {
70          // shift down |-----AAAAAAA|
71          tail -= head;
72          System.arraycopy(objects, head, objects, 0, tail);
73          head = 0;
74        }
75  
76        if (tail == head || comparator.compare(objects[tail - 1], elem) <= 0) {
77          // Append
78          objects[tail++] = elem;
79        } else if (head > 0 && comparator.compare(objects[head], elem) > 0) {
80          // Prepend
81          objects[--head] = elem;
82        } else {
83          // Insert in the middle
84          int index = upperBound(head, tail - 1, elem);
85          System.arraycopy(objects, index, objects, index + 1, tail - index);
86          objects[index] = elem;
87          tail++;
88        }
89      }
90  
91      public E peek() {
92        return (head != tail) ? objects[head] : null;
93      }
94  
95      public E poll() {
96        E elem = objects[head];
97        head = (head + 1) % objects.length;
98        if (head == 0) tail = 0;
99        return elem;
100     }
101 
102     public int size() {
103       return tail - head;
104     }
105 
106     public Comparator<? super E> comparator() {
107       return this.comparator;
108     }
109 
110     public boolean contains(Object o) {
111       for (int i = head; i < tail; ++i) {
112         if (objects[i] == o) {
113           return true;
114         }
115       }
116       return false;
117     }
118 
119     public int remainingCapacity() {
120       return this.objects.length - (tail - head);
121     }
122 
123     private int upperBound(int start, int end, E key) {
124       while (start < end) {
125         int mid = (start + end) >>> 1;
126         E mitem = objects[mid];
127         int cmp = comparator.compare(mitem, key);
128         if (cmp > 0) {
129           end = mid;
130         } else {
131           start = mid + 1;
132         }
133       }
134       return start;
135     }
136   }
137 
138 
139   // Lock used for all operations
140   private final ReentrantLock lock = new ReentrantLock();
141 
142   // Condition for blocking when empty
143   private final Condition notEmpty = lock.newCondition();
144 
145   // Wait queue for waiting puts
146   private final Condition notFull = lock.newCondition();
147 
148   private final PriorityQueue<E> queue;
149 
150   /**
151    * Creates a PriorityQueue with the specified capacity that orders its
152    * elements according to the specified comparator.
153    * @param capacity the capacity of this queue
154    * @param comparator the comparator that will be used to order this priority queue
155    */
156   public BoundedPriorityBlockingQueue(int capacity,
157       Comparator<? super E> comparator) {
158     this.queue = new PriorityQueue<E>(capacity, comparator);
159   }
160 
161   public boolean offer(E e) {
162     if (e == null) throw new NullPointerException();
163 
164     lock.lock();
165     try {
166       if (queue.remainingCapacity() > 0) {
167         this.queue.add(e);
168         notEmpty.signal();
169         return true;
170       }
171     } finally {
172       lock.unlock();
173     }
174     return false;
175   }
176 
177   public void put(E e) throws InterruptedException {
178     if (e == null) throw new NullPointerException();
179 
180     lock.lock();
181     try {
182       while (queue.remainingCapacity() == 0) {
183         notFull.await();
184       }
185       this.queue.add(e);
186       notEmpty.signal();
187     } finally {
188       lock.unlock();
189     }
190   }
191 
192   public boolean offer(E e, long timeout, TimeUnit unit)
193       throws InterruptedException {
194     if (e == null) throw new NullPointerException();
195     long nanos = unit.toNanos(timeout);
196 
197     lock.lockInterruptibly();
198     try {
199       while (queue.remainingCapacity() == 0) {
200         if (nanos <= 0)
201           return false;
202         nanos = notFull.awaitNanos(nanos);
203       }
204       this.queue.add(e);
205       notEmpty.signal();
206     } finally {
207       lock.unlock();
208     }
209     return true;
210   }
211 
212   public E take() throws InterruptedException {
213     E result = null;
214     lock.lockInterruptibly();
215     try {
216       while (queue.size() == 0) {
217         notEmpty.await();
218       }
219       result = queue.poll();
220       notFull.signal();
221     } finally {
222       lock.unlock();
223     }
224     return result;
225   }
226 
227   public E poll() {
228     E result = null;
229     lock.lock();
230     try {
231       if (queue.size() > 0) {
232         result = queue.poll();
233         notFull.signal();
234       }
235     } finally {
236       lock.unlock();
237     }
238     return result;
239   }
240 
241   public E poll(long timeout, TimeUnit unit)
242       throws InterruptedException {
243     long nanos = unit.toNanos(timeout);
244     lock.lockInterruptibly();
245     E result = null;
246     try {
247       while (queue.size() == 0 && nanos > 0) {
248         nanos = notEmpty.awaitNanos(nanos);
249       }
250       if (queue.size() > 0) {
251         result = queue.poll();
252       }
253       notFull.signal();
254     } finally {
255       lock.unlock();
256     }
257     return result;
258   }
259 
260   public E peek() {
261     lock.lock();
262     try {
263       return queue.peek();
264     } finally {
265       lock.unlock();
266     }
267   }
268 
269   public int size() {
270     lock.lock();
271     try {
272       return queue.size();
273     } finally {
274       lock.unlock();
275     }
276   }
277 
278   public Iterator<E> iterator() {
279     throw new UnsupportedOperationException();
280   }
281 
282   public Comparator<? super E> comparator() {
283     return queue.comparator();
284   }
285 
286   public int remainingCapacity() {
287     lock.lock();
288     try {
289       return queue.remainingCapacity();
290     } finally {
291       lock.unlock();
292     }
293   }
294 
295   public boolean remove(Object o) {
296     throw new UnsupportedOperationException();
297   }
298 
299   public boolean contains(Object o) {
300     lock.lock();
301     try {
302       return queue.contains(o);
303     } finally {
304       lock.unlock();
305     }
306   }
307 
308   public int drainTo(Collection<? super E> c) {
309     return drainTo(c, Integer.MAX_VALUE);
310   }
311 
312   public int drainTo(Collection<? super E> c, int maxElements) {
313     if (c == null)
314         throw new NullPointerException();
315     if (c == this)
316         throw new IllegalArgumentException();
317     if (maxElements <= 0)
318         return 0;
319     lock.lock();
320     try {
321       int n = Math.min(queue.size(), maxElements);
322       for (int i = 0; i < n; ++i) {
323         c.add(queue.poll());
324       }
325       return n;
326     } finally {
327       lock.unlock();
328     }
329   }
330 }