1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import org.apache.hadoop.hbase.classification.InterfaceAudience;
22
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.PriorityBlockingQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.locks.Condition;
27 import java.util.concurrent.locks.Lock;
28 import java.util.concurrent.locks.ReentrantLock;
29
30
31
32
33
34
35
36
37
38
39
40
41 @InterfaceAudience.Private
42 public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
43
44 private BlockingQueue<T> stealFromQueue;
45
46 private final Lock lock = new ReentrantLock();
47 private final Condition notEmpty = lock.newCondition();
48
49 public StealJobQueue() {
50 this.stealFromQueue = new PriorityBlockingQueue<T>() {
51 @Override
52 public boolean offer(T t) {
53 lock.lock();
54 try {
55 notEmpty.signal();
56 return super.offer(t);
57 } finally {
58 lock.unlock();
59 }
60 }
61 };
62 }
63
64 public BlockingQueue<T> getStealFromQueue() {
65 return stealFromQueue;
66 }
67
68 @Override
69 public boolean offer(T t) {
70 lock.lock();
71 try {
72 notEmpty.signal();
73 return super.offer(t);
74 } finally {
75 lock.unlock();
76 }
77 }
78
79
80 @Override
81 public T take() throws InterruptedException {
82 lock.lockInterruptibly();
83 try {
84 while (true) {
85 T retVal = this.poll();
86 if (retVal == null) {
87 retVal = stealFromQueue.poll();
88 }
89 if (retVal == null) {
90 notEmpty.await();
91 } else {
92 return retVal;
93 }
94 }
95 } finally {
96 lock.unlock();
97 }
98 }
99
100 @Override
101 public T poll(long timeout, TimeUnit unit) throws InterruptedException {
102 long nanos = unit.toNanos(timeout);
103 lock.lockInterruptibly();
104 try {
105 while (true) {
106 T retVal = this.poll();
107 if (retVal == null) {
108 retVal = stealFromQueue.poll();
109 }
110 if (retVal == null) {
111 if (nanos <= 0)
112 return null;
113 nanos = notEmpty.awaitNanos(nanos);
114 } else {
115 return retVal;
116 }
117 }
118 } finally {
119 lock.unlock();
120 }
121 }
122 }
123