1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.util.Map;
22
23 import java.util.concurrent.locks.ReentrantLock;
24 import java.util.concurrent.ConcurrentSkipListMap;
25
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 @InterfaceAudience.Private
46 @InterfaceStability.Evolving
47 public class ProcedureFairRunQueues<TKey, TQueue extends ProcedureFairRunQueues.FairObject> {
48 private ConcurrentSkipListMap<TKey, TQueue> objMap =
49 new ConcurrentSkipListMap<TKey, TQueue>();
50
51 private final ReentrantLock lock = new ReentrantLock();
52 private final int quantum;
53
54 private Map.Entry<TKey, TQueue> current = null;
55 private int currentQuantum = 0;
56
57 public interface FairObject {
58 boolean isAvailable();
59 int getPriority();
60 }
61
62
63
64
65 public ProcedureFairRunQueues(final int quantum) {
66 this.quantum = quantum;
67 }
68
69 public TQueue get(final TKey key) {
70 return objMap.get(key);
71 }
72
73 public TQueue add(final TKey key, final TQueue queue) {
74 TQueue oldq = objMap.putIfAbsent(key, queue);
75 return oldq != null ? oldq : queue;
76 }
77
78 public TQueue remove(final TKey key) {
79 TQueue queue = objMap.get(key);
80 if (queue != null) {
81 lock.lock();
82 try {
83 queue = objMap.remove(key);
84 if (current != null && queue == current.getValue()) {
85 currentQuantum = 0;
86 current = null;
87 }
88 } finally {
89 lock.unlock();
90 }
91 }
92 return queue;
93 }
94
95 public void clear() {
96 lock.lock();
97 try {
98 currentQuantum = 0;
99 current = null;
100 objMap.clear();
101 } finally {
102 lock.unlock();
103 }
104 }
105
106
107
108
109 public TQueue poll() {
110 lock.lock();
111 try {
112 TQueue queue;
113 if (currentQuantum == 0) {
114 if (nextObject() == null) {
115
116 return null;
117 }
118
119 queue = current.getValue();
120 currentQuantum = calculateQuantum(queue) - 1;
121 } else {
122 currentQuantum--;
123 queue = current.getValue();
124 }
125
126 if (!queue.isAvailable()) {
127 Map.Entry<TKey, TQueue> last = current;
128
129 do {
130 if (nextObject() == null)
131 return null;
132 } while (current.getValue() != last.getValue() && !current.getValue().isAvailable());
133
134 queue = current.getValue();
135 currentQuantum = calculateQuantum(queue) - 1;
136 }
137
138 return queue;
139 } finally {
140 lock.unlock();
141 }
142 }
143
144 @Override
145 public String toString() {
146 StringBuilder builder = new StringBuilder();
147 builder.append('{');
148 for (Map.Entry<TKey, TQueue> entry: objMap.entrySet()) {
149 builder.append(entry.getKey());
150 builder.append(':');
151 builder.append(entry.getValue());
152 }
153 builder.append('}');
154 return builder.toString();
155 }
156
157 private Map.Entry<TKey, TQueue> nextObject() {
158 Map.Entry<TKey, TQueue> next = null;
159
160
161 if (current != null) {
162 next = objMap.higherEntry(current.getKey());
163 }
164
165
166 current = (next != null) ? next : objMap.firstEntry();
167 return current;
168 }
169
170 private int calculateQuantum(final TQueue fairObject) {
171
172 return Math.max(1, fairObject.getPriority() * quantum);
173 }
174 }