View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.util;
20  
21  import java.util.concurrent.atomic.AtomicInteger;
22  import java.util.concurrent.locks.Condition;
23  import java.util.concurrent.locks.ReentrantLock;
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.TimeUnit;
26  import java.util.Collection;
27  import java.util.Comparator;
28  import java.util.Iterator;
29  import java.util.NoSuchElementException;
30  import java.util.AbstractQueue;
31  
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.classification.InterfaceStability;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  
38  /**
39   * A generic bounded blocking Priority-Queue.
40   *
41   * The elements of the priority queue are ordered according to the Comparator
42   * provided at queue construction time.
43   *
44   * If multiple elements have the same priority this queue orders them in
45   * FIFO (first-in-first-out) manner.
46   * The head of this queue is the least element with respect to the specified
47   * ordering. If multiple elements are tied for least value, the head is the
48   * first one inserted.
49   * The queue retrieval operations poll, remove, peek, and element access the
50   * element at the head of the queue.
51   */
52  @InterfaceAudience.Private
53  @InterfaceStability.Stable
54  public class BoundedPriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
55    private static class PriorityQueue<E> {
56      private final Comparator<? super E> comparator;
57      private final E[] objects;
58  
59      private int head = 0;
60      private int tail = 0;
61  
62      @SuppressWarnings("unchecked")
63      public PriorityQueue(int capacity, Comparator<? super E> comparator) {
64        this.objects = (E[])new Object[capacity];
65        this.comparator = comparator;
66      }
67  
68      public void add(E elem) {
69        if (tail == objects.length) {
70          // shift down |-----AAAAAAA|
71          tail -= head;
72          System.arraycopy(objects, head, objects, 0, tail);
73          head = 0;
74        }
75  
76        if (tail == head || comparator.compare(objects[tail - 1], elem) <= 0) {
77          // Append
78          objects[tail++] = elem;
79        } else if (head > 0 && comparator.compare(objects[head], elem) > 0) {
80          // Prepend
81          objects[--head] = elem;
82        } else {
83          // Insert in the middle
84          int index = upperBound(head, tail - 1, elem);
85          System.arraycopy(objects, index, objects, index + 1, tail - index);
86          objects[index] = elem;
87          tail++;
88        }
89      }
90  
91      public E peek() {
92        return (head != tail) ? objects[head] : null;
93      }
94  
95      public E poll() {
96        E elem = objects[head];
97        objects[head] = null;
98        head = (head + 1) % objects.length;
99        if (head == 0) tail = 0;
100       return elem;
101     }
102 
103     public int size() {
104       return tail - head;
105     }
106 
107     public Comparator<? super E> comparator() {
108       return this.comparator;
109     }
110 
111     public boolean contains(Object o) {
112       for (int i = head; i < tail; ++i) {
113         if (objects[i] == o) {
114           return true;
115         }
116       }
117       return false;
118     }
119 
120     public int remainingCapacity() {
121       return this.objects.length - (tail - head);
122     }
123 
124     private int upperBound(int start, int end, E key) {
125       while (start < end) {
126         int mid = (start + end) >>> 1;
127         E mitem = objects[mid];
128         int cmp = comparator.compare(mitem, key);
129         if (cmp > 0) {
130           end = mid;
131         } else {
132           start = mid + 1;
133         }
134       }
135       return start;
136     }
137   }
138 
139 
140   // Lock used for all operations
141   private final ReentrantLock lock = new ReentrantLock();
142 
143   // Condition for blocking when empty
144   private final Condition notEmpty = lock.newCondition();
145 
146   // Wait queue for waiting puts
147   private final Condition notFull = lock.newCondition();
148 
149   private final PriorityQueue<E> queue;
150 
151   /**
152    * Creates a PriorityQueue with the specified capacity that orders its
153    * elements according to the specified comparator.
154    * @param capacity the capacity of this queue
155    * @param comparator the comparator that will be used to order this priority queue
156    */
157   public BoundedPriorityBlockingQueue(int capacity,
158       Comparator<? super E> comparator) {
159     this.queue = new PriorityQueue<E>(capacity, comparator);
160   }
161 
162   public boolean offer(E e) {
163     if (e == null) throw new NullPointerException();
164 
165     lock.lock();
166     try {
167       if (queue.remainingCapacity() > 0) {
168         this.queue.add(e);
169         notEmpty.signal();
170         return true;
171       }
172     } finally {
173       lock.unlock();
174     }
175     return false;
176   }
177 
178   public void put(E e) throws InterruptedException {
179     if (e == null) throw new NullPointerException();
180 
181     lock.lock();
182     try {
183       while (queue.remainingCapacity() == 0) {
184         notFull.await();
185       }
186       this.queue.add(e);
187       notEmpty.signal();
188     } finally {
189       lock.unlock();
190     }
191   }
192 
193   public boolean offer(E e, long timeout, TimeUnit unit)
194       throws InterruptedException {
195     if (e == null) throw new NullPointerException();
196     long nanos = unit.toNanos(timeout);
197 
198     lock.lockInterruptibly();
199     try {
200       while (queue.remainingCapacity() == 0) {
201         if (nanos <= 0)
202           return false;
203         nanos = notFull.awaitNanos(nanos);
204       }
205       this.queue.add(e);
206       notEmpty.signal();
207     } finally {
208       lock.unlock();
209     }
210     return true;
211   }
212 
213   public E take() throws InterruptedException {
214     E result = null;
215     lock.lockInterruptibly();
216     try {
217       while (queue.size() == 0) {
218         notEmpty.await();
219       }
220       result = queue.poll();
221       notFull.signal();
222     } finally {
223       lock.unlock();
224     }
225     return result;
226   }
227 
228   public E poll() {
229     E result = null;
230     lock.lock();
231     try {
232       if (queue.size() > 0) {
233         result = queue.poll();
234         notFull.signal();
235       }
236     } finally {
237       lock.unlock();
238     }
239     return result;
240   }
241 
242   public E poll(long timeout, TimeUnit unit)
243       throws InterruptedException {
244     long nanos = unit.toNanos(timeout);
245     lock.lockInterruptibly();
246     E result = null;
247     try {
248       while (queue.size() == 0 && nanos > 0) {
249         nanos = notEmpty.awaitNanos(nanos);
250       }
251       if (queue.size() > 0) {
252         result = queue.poll();
253       }
254       notFull.signal();
255     } finally {
256       lock.unlock();
257     }
258     return result;
259   }
260 
261   public E peek() {
262     lock.lock();
263     try {
264       return queue.peek();
265     } finally {
266       lock.unlock();
267     }
268   }
269 
270   public int size() {
271     lock.lock();
272     try {
273       return queue.size();
274     } finally {
275       lock.unlock();
276     }
277   }
278 
279   public Iterator<E> iterator() {
280     throw new UnsupportedOperationException();
281   }
282 
283   public Comparator<? super E> comparator() {
284     return queue.comparator();
285   }
286 
287   public int remainingCapacity() {
288     lock.lock();
289     try {
290       return queue.remainingCapacity();
291     } finally {
292       lock.unlock();
293     }
294   }
295 
296   public boolean remove(Object o) {
297     throw new UnsupportedOperationException();
298   }
299 
300   public boolean contains(Object o) {
301     lock.lock();
302     try {
303       return queue.contains(o);
304     } finally {
305       lock.unlock();
306     }
307   }
308 
309   public int drainTo(Collection<? super E> c) {
310     return drainTo(c, Integer.MAX_VALUE);
311   }
312 
313   public int drainTo(Collection<? super E> c, int maxElements) {
314     if (c == null)
315         throw new NullPointerException();
316     if (c == this)
317         throw new IllegalArgumentException();
318     if (maxElements <= 0)
319         return 0;
320     lock.lock();
321     try {
322       int n = Math.min(queue.size(), maxElements);
323       for (int i = 0; i < n; ++i) {
324         c.add(queue.poll());
325       }
326       return n;
327     } finally {
328       lock.unlock();
329     }
330   }
331 }