1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.util;
20  
21  import org.apache.hadoop.hbase.classification.InterfaceAudience;
22  
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.PriorityBlockingQueue;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.locks.Condition;
27  import java.util.concurrent.locks.Lock;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  
31  
32  
33  
34  
35  
36  
37  
38  
39  
40  
41  @InterfaceAudience.Private
42  public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
43  
44    private BlockingQueue<T> stealFromQueue;
45  
46    private final Lock lock = new ReentrantLock();
47    private final Condition notEmpty = lock.newCondition();
48  
49    public StealJobQueue() {
50      this.stealFromQueue = new PriorityBlockingQueue<T>() {
51        @Override
52        public boolean offer(T t) {
53          lock.lock();
54          try {
55            notEmpty.signal();
56            return super.offer(t);
57          } finally {
58            lock.unlock();
59          }
60        }
61      };
62    }
63  
64    public BlockingQueue<T> getStealFromQueue() {
65      return stealFromQueue;
66    }
67  
68    @Override
69    public boolean offer(T t) {
70      lock.lock();
71      try {
72        notEmpty.signal();
73        return super.offer(t);
74      } finally {
75        lock.unlock();
76      }
77    }
78  
79  
80    @Override
81    public T take() throws InterruptedException {
82      lock.lockInterruptibly();
83      try {
84        while (true) {
85          T retVal = this.poll();
86          if (retVal == null) {
87            retVal = stealFromQueue.poll();
88          }
89          if (retVal == null) {
90            notEmpty.await();
91          } else {
92            return retVal;
93          }
94        }
95      } finally {
96        lock.unlock();
97      }
98    }
99  
100   @Override
101   public T poll(long timeout, TimeUnit unit) throws InterruptedException {
102     long nanos = unit.toNanos(timeout);
103     lock.lockInterruptibly();
104     try {
105       while (true) {
106         T retVal = this.poll();
107         if (retVal == null) {
108           retVal = stealFromQueue.poll();
109         }
110         if (retVal == null) {
111           if (nanos <= 0)
112             return null;
113           nanos = notEmpty.awaitNanos(nanos);
114         } else {
115           return retVal;
116         }
117       }
118     } finally {
119       lock.unlock();
120     }
121   }
122 }
123