1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master.procedure;
20
21 import java.io.IOException;
22 import java.util.ArrayDeque;
23 import java.util.Deque;
24 import java.util.concurrent.locks.Condition;
25 import java.util.concurrent.locks.ReentrantLock;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.TableExistsException;
32 import org.apache.hadoop.hbase.TableNotFoundException;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.hbase.classification.InterfaceStability;
35 import org.apache.hadoop.hbase.procedure2.Procedure;
36 import org.apache.hadoop.hbase.procedure2.ProcedureFairRunQueues;
37 import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet;
38 import org.apache.hadoop.hbase.master.TableLockManager;
39 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
40 import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 @InterfaceAudience.Private
57 @InterfaceStability.Evolving
58 public class MasterProcedureQueue implements ProcedureRunnableSet {
59 private static final Log LOG = LogFactory.getLog(MasterProcedureQueue.class);
60
61 private final ProcedureFairRunQueues<TableName, RunQueue> fairq;
62 private final ReentrantLock lock = new ReentrantLock();
63 private final Condition waitCond = lock.newCondition();
64 private final TableLockManager lockManager;
65
66 private final int metaTablePriority;
67 private final int userTablePriority;
68 private final int sysTablePriority;
69
70 private int queueSize;
71
72 public MasterProcedureQueue(final Configuration conf, final TableLockManager lockManager) {
73 this.fairq = new ProcedureFairRunQueues<TableName, RunQueue>(1);
74 this.lockManager = lockManager;
75
76
77 metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3);
78 sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2);
79 userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1);
80 }
81
82 @Override
83 public void addFront(final Procedure proc) {
84 lock.lock();
85 try {
86 getRunQueueOrCreate(proc).addFront(proc);
87 queueSize++;
88 waitCond.signal();
89 } finally {
90 lock.unlock();
91 }
92 }
93
94 @Override
95 public void addBack(final Procedure proc) {
96 lock.lock();
97 try {
98 getRunQueueOrCreate(proc).addBack(proc);
99 queueSize++;
100 waitCond.signal();
101 } finally {
102 lock.unlock();
103 }
104 }
105
106 @Override
107 public void yield(final Procedure proc) {
108 addFront(proc);
109 }
110
111 @Override
112 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
113 public Long poll() {
114 lock.lock();
115 try {
116 if (queueSize == 0) {
117 waitCond.await();
118 if (queueSize == 0) {
119 return null;
120 }
121 }
122
123 RunQueue queue = fairq.poll();
124 if (queue != null && queue.isAvailable()) {
125 queueSize--;
126 return queue.poll();
127 }
128 } catch (InterruptedException e) {
129 Thread.currentThread().interrupt();
130 return null;
131 } finally {
132 lock.unlock();
133 }
134 return null;
135 }
136
137 @Override
138 public void signalAll() {
139 lock.lock();
140 try {
141 waitCond.signalAll();
142 } finally {
143 lock.unlock();
144 }
145 }
146
147 @Override
148 public void clear() {
149 lock.lock();
150 try {
151 fairq.clear();
152 queueSize = 0;
153 } finally {
154 lock.unlock();
155 }
156 }
157
158 @Override
159 public int size() {
160 lock.lock();
161 try {
162 return queueSize;
163 } finally {
164 lock.unlock();
165 }
166 }
167
168 @Override
169 public String toString() {
170 lock.lock();
171 try {
172 return "MasterProcedureQueue size=" + queueSize + ": " + fairq;
173 } finally {
174 lock.unlock();
175 }
176 }
177
178 @Override
179 public void completionCleanup(Procedure proc) {
180 if (proc instanceof TableProcedureInterface) {
181 TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
182 boolean tableDeleted;
183 if (proc.hasException()) {
184 IOException procEx = proc.getException().unwrapRemoteException();
185 if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
186
187 tableDeleted = !(procEx instanceof TableExistsException);
188 } else {
189
190 tableDeleted = (procEx instanceof TableNotFoundException);
191 }
192 } else {
193
194 tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
195 }
196 if (tableDeleted) {
197 markTableAsDeleted(iProcTable.getTableName());
198 }
199 }
200 }
201
202 private RunQueue getRunQueueOrCreate(final Procedure proc) {
203 if (proc instanceof TableProcedureInterface) {
204 final TableName table = ((TableProcedureInterface)proc).getTableName();
205 return getRunQueueOrCreate(table);
206 }
207
208
209
210
211 throw new UnsupportedOperationException("RQs for non-table procedures are not implemented yet");
212 }
213
214 private TableRunQueue getRunQueueOrCreate(final TableName table) {
215 final TableRunQueue queue = getRunQueue(table);
216 if (queue != null) return queue;
217 return (TableRunQueue)fairq.add(table, createTableRunQueue(table));
218 }
219
220 private TableRunQueue createTableRunQueue(final TableName table) {
221 int priority = userTablePriority;
222 if (table.equals(TableName.META_TABLE_NAME)) {
223 priority = metaTablePriority;
224 } else if (table.isSystemTable()) {
225 priority = sysTablePriority;
226 }
227 return new TableRunQueue(priority);
228 }
229
230 private TableRunQueue getRunQueue(final TableName table) {
231 return (TableRunQueue)fairq.get(table);
232 }
233
234
235
236
237
238
239
240
241
242 public boolean tryAcquireTableRead(final TableName table, final String purpose) {
243 return getRunQueueOrCreate(table).tryRead(lockManager, table, purpose);
244 }
245
246
247
248
249
250 public void releaseTableRead(final TableName table) {
251 getRunQueue(table).releaseRead(lockManager, table);
252 }
253
254
255
256
257
258
259
260
261 public boolean tryAcquireTableWrite(final TableName table, final String purpose) {
262 return getRunQueueOrCreate(table).tryWrite(lockManager, table, purpose);
263 }
264
265
266
267
268
269 public void releaseTableWrite(final TableName table) {
270 getRunQueue(table).releaseWrite(lockManager, table);
271 }
272
273
274
275
276
277
278
279
280
281 protected boolean markTableAsDeleted(final TableName table) {
282 TableRunQueue queue = getRunQueue(table);
283 if (queue != null) {
284 lock.lock();
285 try {
286 if (queue.isEmpty() && queue.acquireDeleteLock()) {
287 fairq.remove(table);
288
289
290 try {
291 lockManager.tableDeleted(table);
292 } catch (IOException e) {
293 LOG.warn("Received exception from TableLockManager.tableDeleted:", e);
294 }
295 } else {
296
297 return false;
298 }
299 } finally {
300 lock.unlock();
301 }
302 }
303 return true;
304 }
305
306 private interface RunQueue extends ProcedureFairRunQueues.FairObject {
307 void addFront(Procedure proc);
308 void addBack(Procedure proc);
309 Long poll();
310 boolean acquireDeleteLock();
311 }
312
313
314
315
316
317
318 private static class TableRunQueue implements RunQueue {
319 private final Deque<Long> runnables = new ArrayDeque<Long>();
320 private final int priority;
321
322 private TableLock tableLock = null;
323 private boolean wlock = false;
324 private int rlock = 0;
325
326 public TableRunQueue(int priority) {
327 this.priority = priority;
328 }
329
330 @Override
331 public void addFront(final Procedure proc) {
332 runnables.addFirst(proc.getProcId());
333 }
334
335
336
337 @Override
338 public void addBack(final Procedure proc) {
339 runnables.addLast(proc.getProcId());
340 }
341
342 @Override
343 public Long poll() {
344 return runnables.poll();
345 }
346
347 @Override
348 public boolean isAvailable() {
349 synchronized (this) {
350 return !wlock && !runnables.isEmpty();
351 }
352 }
353
354 public boolean isEmpty() {
355 return runnables.isEmpty();
356 }
357
358 @Override
359 public synchronized boolean acquireDeleteLock() {
360 if (isLocked()) {
361 return false;
362 }
363 wlock = true;
364 return true;
365 }
366
367 public boolean isLocked() {
368 synchronized (this) {
369 return wlock || rlock > 0;
370 }
371 }
372
373 public boolean tryRead(final TableLockManager lockManager,
374 final TableName tableName, final String purpose) {
375 synchronized (this) {
376 if (wlock) {
377 return false;
378 }
379
380
381 tableLock = lockManager.readLock(tableName, purpose);
382 try {
383 tableLock.acquire();
384 } catch (IOException e) {
385 LOG.error("failed acquire read lock on " + tableName, e);
386 tableLock = null;
387 return false;
388 }
389
390 rlock++;
391 }
392 return true;
393 }
394
395 public void releaseRead(final TableLockManager lockManager,
396 final TableName tableName) {
397 synchronized (this) {
398 releaseTableLock(lockManager, rlock == 1);
399 rlock--;
400 }
401 }
402
403 public boolean tryWrite(final TableLockManager lockManager,
404 final TableName tableName, final String purpose) {
405 synchronized (this) {
406 if (wlock || rlock > 0) {
407 return false;
408 }
409
410
411 tableLock = lockManager.writeLock(tableName, purpose);
412 try {
413 tableLock.acquire();
414 } catch (IOException e) {
415 LOG.error("failed acquire write lock on " + tableName, e);
416 tableLock = null;
417 return false;
418 }
419 wlock = true;
420 }
421 return true;
422 }
423
424 public void releaseWrite(final TableLockManager lockManager,
425 final TableName tableName) {
426 synchronized (this) {
427 releaseTableLock(lockManager, true);
428 wlock = false;
429 }
430 }
431
432 private void releaseTableLock(final TableLockManager lockManager, boolean reset) {
433 for (int i = 0; i < 3; ++i) {
434 try {
435 tableLock.release();
436 if (reset) {
437 tableLock = null;
438 }
439 break;
440 } catch (IOException e) {
441 LOG.warn("Could not release the table write-lock", e);
442 }
443 }
444 }
445
446 @Override
447 public int getPriority() {
448 return priority;
449 }
450
451 @Override
452 public String toString() {
453 return runnables.toString();
454 }
455 }
456 }