View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2.util;
20  
21  import java.util.concurrent.locks.Condition;
22  import java.util.concurrent.locks.ReentrantLock;
23  import java.util.concurrent.TimeUnit;
24  
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.classification.InterfaceStability;
27  
28  @InterfaceAudience.Private
29  @InterfaceStability.Evolving
30  public class TimeoutBlockingQueue<E> {
31    public static interface TimeoutRetriever<T> {
32      long getTimeout(T object);
33      TimeUnit getTimeUnit(T object);
34    }
35  
36    private final ReentrantLock lock = new ReentrantLock();
37    private final Condition waitCond = lock.newCondition();
38    private final TimeoutRetriever<? super E> timeoutRetriever;
39  
40    private E[] objects;
41    private int head = 0;
42    private int tail = 0;
43  
44    public TimeoutBlockingQueue(TimeoutRetriever<? super E> timeoutRetriever) {
45      this(32, timeoutRetriever);
46    }
47  
48    @SuppressWarnings("unchecked")
49    public TimeoutBlockingQueue(int capacity, TimeoutRetriever<? super E> timeoutRetriever) {
50      this.objects = (E[])new Object[capacity];
51      this.timeoutRetriever = timeoutRetriever;
52    }
53  
54    public void dump() {
55      for (int i = 0; i < objects.length; ++i) {
56        if (i == head) {
57          System.out.print("[" + objects[i] + "] ");
58        } else if (i == tail) {
59          System.out.print("]" + objects[i] + "[ ");
60        } else {
61          System.out.print(objects[i] + " ");
62        }
63      }
64      System.out.println();
65    }
66  
67    public void clear() {
68      lock.lock();
69      try {
70        if (head != tail) {
71          for (int i = head; i < tail; ++i) {
72            objects[i] = null;
73          }
74          head = 0;
75          tail = 0;
76          waitCond.signal();
77        }
78      } finally {
79        lock.unlock();
80      }
81    }
82  
83    public void add(E e) {
84      if (e == null) throw new NullPointerException();
85  
86      lock.lock();
87      try {
88        addElement(e);
89        waitCond.signal();
90      } finally {
91        lock.unlock();
92      }
93    }
94  
95    @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
96    public E poll() {
97      lock.lock();
98      try {
99        if (isEmpty()) {
100         waitCond.await();
101         return null;
102       }
103 
104       E elem = objects[head];
105       long nanos = getNanosTimeout(elem);
106       nanos = waitCond.awaitNanos(nanos);
107       return nanos > 0 ? null : removeFirst();
108     } catch (InterruptedException e) {
109       Thread.currentThread().interrupt();
110       return null;
111     } finally {
112       lock.unlock();
113     }
114   }
115 
116   public int size() {
117     return tail - head;
118   }
119 
120   public boolean isEmpty() {
121     return (tail - head) == 0;
122   }
123 
124   public void signalAll() {
125     lock.lock();
126     try {
127       waitCond.signalAll();
128     } finally {
129       lock.unlock();
130     }
131   }
132 
133   private void addElement(E elem) {
134     int size = (tail - head);
135     if ((objects.length - size) == 0) {
136       int capacity = size + ((size < 64) ? (size + 2) : (size >> 1));
137       E[] newObjects = (E[])new Object[capacity];
138 
139       if (compareTimeouts(objects[tail - 1], elem) <= 0) {
140         // Append
141         System.arraycopy(objects, head, newObjects, 0, tail);
142         tail -= head;
143         newObjects[tail++] = elem;
144       } else if (compareTimeouts(objects[head], elem) > 0) {
145         // Prepend
146         System.arraycopy(objects, head, newObjects, 1, tail);
147         newObjects[0] = elem;
148         tail -= (head - 1);
149       } else {
150         // Insert in the middle
151         int index = upperBound(head, tail - 1, elem);
152         int newIndex = (index - head);
153         System.arraycopy(objects, head, newObjects, 0, newIndex);
154         newObjects[newIndex] = elem;
155         System.arraycopy(objects, index, newObjects, newIndex + 1, tail - index);
156         tail -= (head - 1);
157       }
158       head = 0;
159       objects = newObjects;
160     } else {
161       if (tail == objects.length) {
162         // shift down |-----AAAAAAA|
163         tail -= head;
164         System.arraycopy(objects, head, objects, 0, tail);
165         head = 0;
166       }
167 
168       if (tail == head || compareTimeouts(objects[tail - 1], elem) <= 0) {
169         // Append
170         objects[tail++] = elem;
171       } else if (head > 0 && compareTimeouts(objects[head], elem) > 0) {
172         // Prepend
173         objects[--head] = elem;
174       } else {
175         // Insert in the middle
176         int index = upperBound(head, tail - 1, elem);
177         System.arraycopy(objects, index, objects, index + 1, tail - index);
178         objects[index] = elem;
179         tail++;
180       }
181     }
182   }
183 
184   private E removeFirst() {
185     E elem = objects[head];
186     objects[head] = null;
187     head = (head + 1) % objects.length;
188     if (head == 0) tail = 0;
189     return elem;
190   }
191 
192   private int upperBound(int start, int end, E key) {
193     while (start < end) {
194       int mid = (start + end) >>> 1;
195       E mitem = objects[mid];
196       int cmp = compareTimeouts(mitem, key);
197       if (cmp > 0) {
198         end = mid;
199       } else {
200         start = mid + 1;
201       }
202     }
203     return start;
204   }
205 
206   private int compareTimeouts(final E a, final E b) {
207     long t1 = getNanosTimeout(a);
208     long t2 = getNanosTimeout(b);
209     return (t1 < t2) ? -1 : (t1 > t2) ? 1 : 0;
210   }
211 
212   private long getNanosTimeout(final E obj) {
213     TimeUnit unit = timeoutRetriever.getTimeUnit(obj);
214     long timeout = timeoutRetriever.getTimeout(obj);
215     return unit.toNanos(timeout);
216   }
217 }