1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master.procedure;
20
21 import java.io.IOException;
22 import java.util.ArrayDeque;
23 import java.util.concurrent.locks.Condition;
24 import java.util.concurrent.locks.ReentrantLock;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.ServerName;
30 import org.apache.hadoop.hbase.TableExistsException;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.TableNotFoundException;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.hbase.classification.InterfaceStability;
35 import org.apache.hadoop.hbase.master.TableLockManager;
36 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
37 import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
38 import org.apache.hadoop.hbase.procedure2.Procedure;
39 import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 @InterfaceAudience.Private
57 @InterfaceStability.Evolving
58 public class MasterProcedureScheduler implements ProcedureRunnableSet {
59 private static final Log LOG = LogFactory.getLog(MasterProcedureScheduler.class);
60
61 private final TableLockManager lockManager;
62 private final ReentrantLock schedLock = new ReentrantLock();
63 private final Condition schedWaitCond = schedLock.newCondition();
64
65 private final FairQueue<ServerName> serverRunQueue = new FairQueue<ServerName>();
66 private final FairQueue<TableName> tableRunQueue = new FairQueue<TableName>();
67 private int queueSize = 0;
68
69 private final Object[] serverBuckets = new Object[128];
70 private Queue<String> namespaceMap = null;
71 private Queue<TableName> tableMap = null;
72
73 private final int metaTablePriority;
74 private final int userTablePriority;
75 private final int sysTablePriority;
76
77
78 private long pollCalls = 0;
79 private long nullPollCalls = 0;
80
81 public MasterProcedureScheduler(final Configuration conf, final TableLockManager lockManager) {
82 this.lockManager = lockManager;
83
84
85 metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3);
86 sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2);
87 userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1);
88 }
89
90 @Override
91 public void addFront(Procedure proc) {
92 doAdd(proc, true);
93 }
94
95 @Override
96 public void addBack(Procedure proc) {
97 doAdd(proc, false);
98 }
99
100 @Override
101 public void yield(final Procedure proc) {
102 doAdd(proc, isTableProcedure(proc));
103 }
104
105 private void doAdd(final Procedure proc, final boolean addFront) {
106 schedLock.lock();
107 try {
108 if (isTableProcedure(proc)) {
109 doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
110 } else if (isServerProcedure(proc)) {
111 doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront);
112 } else {
113
114
115
116
117 throw new UnsupportedOperationException(
118 "RQs for non-table/non-server procedures are not implemented yet");
119 }
120 schedWaitCond.signal();
121 } finally {
122 schedLock.unlock();
123 }
124 }
125
126 private <T extends Comparable<T>> void doAdd(final FairQueue<T> fairq,
127 final Queue<T> queue, final Procedure proc, final boolean addFront) {
128 queue.add(proc, addFront);
129 if (!(queue.isSuspended() || queue.hasExclusiveLock())) {
130 if (queue.size() == 1 && !IterableList.isLinked(queue)) {
131 fairq.add(queue);
132 }
133 queueSize++;
134 }
135 }
136
137 @Override
138 public Procedure poll() {
139 return poll(-1);
140 }
141
142 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
143 Procedure poll(long waitNsec) {
144 Procedure pollResult = null;
145 schedLock.lock();
146 try {
147 if (queueSize == 0) {
148 if (waitNsec < 0) {
149 schedWaitCond.await();
150 } else {
151 schedWaitCond.awaitNanos(waitNsec);
152 }
153 if (queueSize == 0) {
154 return null;
155 }
156 }
157
158
159
160
161 pollResult = doPoll(serverRunQueue);
162 if (pollResult == null) {
163 pollResult = doPoll(tableRunQueue);
164 }
165
166
167 pollCalls++;
168 nullPollCalls += (pollResult == null) ? 1 : 0;
169 } catch (InterruptedException e) {
170 Thread.currentThread().interrupt();
171 } finally {
172 schedLock.unlock();
173 }
174 return pollResult;
175 }
176
177 private <T extends Comparable<T>> Procedure doPoll(final FairQueue<T> fairq) {
178 Queue<T> rq = fairq.poll();
179 if (rq == null || !rq.isAvailable()) {
180 return null;
181 }
182
183 assert !rq.isSuspended() : "rq=" + rq + " is suspended";
184 Procedure pollResult = rq.poll();
185 this.queueSize--;
186 if (rq.isEmpty() || rq.requireExclusiveLock(pollResult)) {
187 removeFromRunQueue(fairq, rq);
188 }
189 return pollResult;
190 }
191
192 @Override
193 public void clear() {
194
195 schedLock.lock();
196 try {
197
198 for (int i = 0; i < serverBuckets.length; ++i) {
199 clear((ServerQueue)serverBuckets[i], serverRunQueue);
200 serverBuckets[i] = null;
201 }
202
203
204 clear(tableMap, tableRunQueue);
205 tableMap = null;
206
207 assert queueSize == 0 : "expected queue size to be 0, got " + queueSize;
208 } finally {
209 schedLock.unlock();
210 }
211 }
212
213 private <T extends Comparable<T>> void clear(Queue<T> treeMap, FairQueue<T> fairq) {
214 while (treeMap != null) {
215 Queue<T> node = AvlTree.getFirst(treeMap);
216 assert !node.isSuspended() : "can't clear suspended " + node.getKey();
217 treeMap = AvlTree.remove(treeMap, node.getKey());
218 removeFromRunQueue(fairq, node);
219 }
220 }
221
222 @Override
223 public void signalAll() {
224 schedLock.lock();
225 try {
226 schedWaitCond.signalAll();
227 } finally {
228 schedLock.unlock();
229 }
230 }
231
232 @Override
233 public int size() {
234 schedLock.lock();
235 try {
236 return queueSize;
237 } finally {
238 schedLock.unlock();
239 }
240 }
241
242 @Override
243 public void completionCleanup(Procedure proc) {
244 if (proc instanceof TableProcedureInterface) {
245 TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
246 boolean tableDeleted;
247 if (proc.hasException()) {
248 IOException procEx = proc.getException().unwrapRemoteException();
249 if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
250
251 tableDeleted = !(procEx instanceof TableExistsException);
252 } else {
253
254 tableDeleted = (procEx instanceof TableNotFoundException);
255 }
256 } else {
257
258 tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
259 }
260 if (tableDeleted) {
261 markTableAsDeleted(iProcTable.getTableName());
262 return;
263 }
264 } else {
265
266 return;
267 }
268 }
269
270 private <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue) {
271 if (IterableList.isLinked(queue)) return;
272 if (!queue.isEmpty()) {
273 fairq.add(queue);
274 queueSize += queue.size();
275 }
276 }
277
278 private <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, Queue<T> queue) {
279 if (!IterableList.isLinked(queue)) return;
280 fairq.remove(queue);
281 queueSize -= queue.size();
282 }
283
284
285
286
287 public long getPollCalls() {
288 return pollCalls;
289 }
290
291 public long getNullPollCalls() {
292 return nullPollCalls;
293 }
294
295
296
297
298 public boolean waitEvent(ProcedureEvent event, Procedure procedure) {
299 return waitEvent(event, procedure, false);
300 }
301
302 public boolean waitEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) {
303 synchronized (event) {
304 if (event.isReady()) {
305 return false;
306 }
307
308
309 if (!suspendQueue) suspendQueue = true;
310
311 if (isTableProcedure(procedure)) {
312 suspendTableQueue(event, getTableName(procedure));
313 } else if (isServerProcedure(procedure)) {
314 suspendServerQueue(event, getServerName(procedure));
315 } else {
316
317
318
319
320 throw new UnsupportedOperationException(
321 "RQs for non-table/non-server procedures are not implemented yet");
322 }
323 }
324 return true;
325 }
326
327 private void suspendTableQueue(ProcedureEvent event, TableName tableName) {
328 schedLock.lock();
329 try {
330 TableQueue queue = getTableQueue(tableName);
331 if (!queue.setSuspended(true)) return;
332
333 if (LOG.isDebugEnabled()) {
334 LOG.debug("Suspend table queue " + tableName);
335 }
336 removeFromRunQueue(tableRunQueue, queue);
337 event.suspendTableQueue(queue);
338 } finally {
339 schedLock.unlock();
340 }
341 }
342
343 private void suspendServerQueue(ProcedureEvent event, ServerName serverName) {
344 schedLock.lock();
345 try {
346
347 ServerQueue queue = getServerQueue(serverName);
348 if (!queue.setSuspended(true)) return;
349
350 if (LOG.isDebugEnabled()) {
351 LOG.debug("Suspend server queue " + serverName);
352 }
353 removeFromRunQueue(serverRunQueue, queue);
354 event.suspendServerQueue(queue);
355 } finally {
356 schedLock.unlock();
357 }
358 }
359
360 public void suspend(ProcedureEvent event) {
361 synchronized (event) {
362 event.setReady(false);
363 if (LOG.isDebugEnabled()) {
364 LOG.debug("Suspend event " + event);
365 }
366 }
367 }
368
369 public void wake(ProcedureEvent event) {
370 synchronized (event) {
371 event.setReady(true);
372 if (LOG.isDebugEnabled()) {
373 LOG.debug("Wake event " + event);
374 }
375
376 schedLock.lock();
377 try {
378 while (event.hasWaitingTables()) {
379 Queue<TableName> queue = event.popWaitingTable();
380 addToRunQueue(tableRunQueue, queue);
381 }
382
383 while (event.hasWaitingServers()) {
384 Queue<ServerName> queue = event.popWaitingServer();
385 addToRunQueue(serverRunQueue, queue);
386 }
387
388 if (queueSize > 1) {
389 schedWaitCond.signalAll();
390 } else if (queueSize > 0) {
391 schedWaitCond.signal();
392 }
393 } finally {
394 schedLock.unlock();
395 }
396 }
397 }
398
399 public static class ProcedureEvent {
400 private final String description;
401
402 private Queue<ServerName> waitingServers = null;
403 private Queue<TableName> waitingTables = null;
404 private boolean ready = false;
405
406 public ProcedureEvent(String description) {
407 this.description = description;
408 }
409
410 public synchronized boolean isReady() {
411 return ready;
412 }
413
414 private synchronized void setReady(boolean isReady) {
415 this.ready = isReady;
416 }
417
418 private void suspendTableQueue(Queue<TableName> queue) {
419 waitingTables = IterableList.append(waitingTables, queue);
420 }
421
422 private void suspendServerQueue(Queue<ServerName> queue) {
423 waitingServers = IterableList.append(waitingServers, queue);
424 }
425
426 private boolean hasWaitingTables() {
427 return waitingTables != null;
428 }
429
430 private Queue<TableName> popWaitingTable() {
431 Queue<TableName> node = waitingTables;
432 waitingTables = IterableList.remove(waitingTables, node);
433 node.setSuspended(false);
434 return node;
435 }
436
437 private boolean hasWaitingServers() {
438 return waitingServers != null;
439 }
440
441 private Queue<ServerName> popWaitingServer() {
442 Queue<ServerName> node = waitingServers;
443 waitingServers = IterableList.remove(waitingServers, node);
444 node.setSuspended(false);
445 return node;
446 }
447
448 @Override
449 public String toString() {
450 return String.format("ProcedureEvent(%s)", description);
451 }
452 }
453
454
455
456
457 private TableQueue getTableQueueWithLock(TableName tableName) {
458 schedLock.lock();
459 try {
460 return getTableQueue(tableName);
461 } finally {
462 schedLock.unlock();
463 }
464 }
465
466 private TableQueue getTableQueue(TableName tableName) {
467 Queue<TableName> node = AvlTree.get(tableMap, tableName);
468 if (node != null) return (TableQueue)node;
469
470 node = new TableQueue(tableName, getTablePriority(tableName));
471 tableMap = AvlTree.insert(tableMap, node);
472 return (TableQueue)node;
473 }
474
475 private void removeTableQueue(TableName tableName) {
476 tableMap = AvlTree.remove(tableMap, tableName);
477 }
478
479 private int getTablePriority(TableName tableName) {
480 if (tableName.equals(TableName.META_TABLE_NAME)) {
481 return metaTablePriority;
482 } else if (tableName.isSystemTable()) {
483 return sysTablePriority;
484 }
485 return userTablePriority;
486 }
487
488 private static boolean isTableProcedure(Procedure proc) {
489 return proc instanceof TableProcedureInterface;
490 }
491
492 private static TableName getTableName(Procedure proc) {
493 return ((TableProcedureInterface)proc).getTableName();
494 }
495
496
497
498
499 private ServerQueue getServerQueueWithLock(ServerName serverName) {
500 schedLock.lock();
501 try {
502 return getServerQueue(serverName);
503 } finally {
504 schedLock.unlock();
505 }
506 }
507
508 private ServerQueue getServerQueue(ServerName serverName) {
509 int index = getBucketIndex(serverBuckets, serverName.hashCode());
510 Queue<ServerName> root = getTreeRoot(serverBuckets, index);
511 Queue<ServerName> node = AvlTree.get(root, serverName);
512 if (node != null) return (ServerQueue)node;
513
514 node = new ServerQueue(serverName);
515 serverBuckets[index] = AvlTree.insert(root, node);
516 return (ServerQueue)node;
517 }
518
519 private void removeServerQueue(ServerName serverName) {
520 int index = getBucketIndex(serverBuckets, serverName.hashCode());
521 serverBuckets[index] = AvlTree.remove((ServerQueue)serverBuckets[index], serverName);
522 }
523
524 @SuppressWarnings("unchecked")
525 private static <T extends Comparable<T>> Queue<T> getTreeRoot(Object[] buckets, int index) {
526 return (Queue<T>) buckets[index];
527 }
528
529 private static int getBucketIndex(Object[] buckets, int hashCode) {
530 return Math.abs(hashCode) % buckets.length;
531 }
532
533 private static boolean isServerProcedure(Procedure proc) {
534 return proc instanceof ServerProcedureInterface;
535 }
536
537 private static ServerName getServerName(Procedure proc) {
538 return ((ServerProcedureInterface)proc).getServerName();
539 }
540
541
542
543
544 public static class ServerQueue extends QueueImpl<ServerName> {
545 public ServerQueue(ServerName serverName) {
546 super(serverName);
547 }
548
549 public boolean requireExclusiveLock(Procedure proc) {
550 ServerProcedureInterface spi = (ServerProcedureInterface)proc;
551 switch (spi.getServerOperationType()) {
552 case CRASH_HANDLER:
553 return true;
554 default:
555 break;
556 }
557 throw new UnsupportedOperationException("unexpected type " + spi.getServerOperationType());
558 }
559 }
560
561 public static class TableQueue extends QueueImpl<TableName> {
562 private TableLock tableLock = null;
563
564 public TableQueue(TableName tableName, int priority) {
565 super(tableName, priority);
566 }
567
568
569
570
571 private boolean canAbortPendingOperations(Procedure proc) {
572 TableProcedureInterface tpi = (TableProcedureInterface)proc;
573 switch (tpi.getTableOperationType()) {
574 case DELETE:
575 return true;
576 default:
577 return false;
578 }
579 }
580
581 public boolean requireExclusiveLock(Procedure proc) {
582 TableProcedureInterface tpi = (TableProcedureInterface)proc;
583 switch (tpi.getTableOperationType()) {
584 case CREATE:
585 case DELETE:
586 case DISABLE:
587 case EDIT:
588 case ENABLE:
589 return true;
590 case READ:
591 return false;
592 default:
593 break;
594 }
595 throw new UnsupportedOperationException("unexpected type " + tpi.getTableOperationType());
596 }
597
598 private synchronized boolean trySharedLock(final TableLockManager lockManager,
599 final String purpose) {
600 if (hasExclusiveLock()) return false;
601
602
603 TableName tableName = getKey();
604 tableLock = lockManager.readLock(tableName, purpose);
605 try {
606 tableLock.acquire();
607 } catch (IOException e) {
608 LOG.error("failed acquire read lock on " + tableName, e);
609 tableLock = null;
610 return false;
611 }
612
613 trySharedLock();
614 return true;
615 }
616
617 private synchronized void releaseSharedLock(final TableLockManager lockManager) {
618 releaseTableLock(lockManager, isSingleSharedLock());
619 releaseSharedLock();
620 }
621
622 private synchronized boolean tryZkExclusiveLock(final TableLockManager lockManager,
623 final String purpose) {
624
625 TableName tableName = getKey();
626 tableLock = lockManager.writeLock(tableName, purpose);
627 try {
628 tableLock.acquire();
629 } catch (IOException e) {
630 LOG.error("failed acquire write lock on " + tableName, e);
631 tableLock = null;
632 return false;
633 }
634 return true;
635 }
636
637 private synchronized void releaseZkExclusiveLock(final TableLockManager lockManager) {
638 releaseTableLock(lockManager, true);
639 }
640
641 private void releaseTableLock(final TableLockManager lockManager, boolean reset) {
642 for (int i = 0; i < 3; ++i) {
643 try {
644 tableLock.release();
645 if (reset) {
646 tableLock = null;
647 }
648 break;
649 } catch (IOException e) {
650 LOG.warn("Could not release the table write-lock", e);
651 }
652 }
653 }
654 }
655
656
657
658
659
660
661
662
663
664
665
666 public boolean tryAcquireTableExclusiveLock(final Procedure procedure, final TableName table) {
667 schedLock.lock();
668 TableQueue queue = getTableQueue(table);
669
670 if (!queue.tryExclusiveLock(procedure.getProcId())) {
671 schedLock.unlock();
672 return false;
673 }
674
675 removeFromRunQueue(tableRunQueue, queue);
676 schedLock.unlock();
677
678
679 boolean hasXLock = queue.tryZkExclusiveLock(lockManager, procedure.toString());
680 if (!hasXLock) {
681 schedLock.lock();
682 queue.releaseExclusiveLock();
683 addToRunQueue(tableRunQueue, queue);
684 schedLock.unlock();
685 }
686 return hasXLock;
687 }
688
689
690
691
692
693
694 public void releaseTableExclusiveLock(final Procedure procedure, final TableName table) {
695 schedLock.lock();
696 TableQueue queue = getTableQueue(table);
697 schedLock.unlock();
698
699
700 queue.releaseZkExclusiveLock(lockManager);
701
702 schedLock.lock();
703 queue.releaseExclusiveLock();
704 addToRunQueue(tableRunQueue, queue);
705 schedLock.unlock();
706 }
707
708
709
710
711
712
713
714
715 public boolean tryAcquireTableSharedLock(final Procedure procedure, final TableName table) {
716 return tryAcquireTableQueueSharedLock(procedure, table) != null;
717 }
718
719 private TableQueue tryAcquireTableQueueSharedLock(final Procedure procedure,
720 final TableName table) {
721 TableQueue queue = getTableQueueWithLock(table);
722 if (!queue.trySharedLock(lockManager, procedure.toString())) {
723 return null;
724 }
725 return queue;
726 }
727
728
729
730
731
732
733 public void releaseTableSharedLock(final Procedure procedure, final TableName table) {
734 getTableQueueWithLock(table).releaseSharedLock(lockManager);
735 }
736
737
738
739
740
741
742
743
744
745 protected boolean markTableAsDeleted(final TableName table) {
746 final ReentrantLock l = schedLock;
747 l.lock();
748 try {
749 TableQueue queue = getTableQueue(table);
750 if (queue == null) return true;
751
752 if (queue.isEmpty() && queue.tryExclusiveLock(0)) {
753
754 if (IterableList.isLinked(queue)) {
755 tableRunQueue.remove(queue);
756 }
757
758
759 try {
760 lockManager.tableDeleted(table);
761 } catch (IOException e) {
762 LOG.warn("Received exception from TableLockManager.tableDeleted:", e);
763 }
764
765 removeTableQueue(table);
766 } else {
767
768 return false;
769 }
770 } finally {
771 l.unlock();
772 }
773 return true;
774 }
775
776
777
778
779
780
781
782
783
784
785
786 public boolean tryAcquireServerExclusiveLock(final Procedure procedure,
787 final ServerName serverName) {
788 schedLock.lock();
789 try {
790 ServerQueue queue = getServerQueue(serverName);
791 if (queue.tryExclusiveLock(procedure.getProcId())) {
792 removeFromRunQueue(serverRunQueue, queue);
793 return true;
794 }
795 } finally {
796 schedLock.unlock();
797 }
798 return false;
799 }
800
801
802
803
804
805
806
807 public void releaseServerExclusiveLock(final Procedure procedure,
808 final ServerName serverName) {
809 schedLock.lock();
810 try {
811 ServerQueue queue = getServerQueue(serverName);
812 queue.releaseExclusiveLock();
813 addToRunQueue(serverRunQueue, queue);
814 } finally {
815 schedLock.unlock();
816 }
817 }
818
819
820
821
822
823
824
825
826 public boolean tryAcquireServerSharedLock(final Procedure procedure,
827 final ServerName serverName) {
828 return getServerQueueWithLock(serverName).trySharedLock();
829 }
830
831
832
833
834
835
836
837 public void releaseServerSharedLock(final Procedure procedure,
838 final ServerName serverName) {
839 getServerQueueWithLock(serverName).releaseSharedLock();
840 }
841
842
843
844
845 private static interface QueueInterface {
846 boolean isAvailable();
847 boolean isEmpty();
848 int size();
849
850 void add(Procedure proc, boolean addFront);
851 boolean requireExclusiveLock(Procedure proc);
852 Procedure peek();
853 Procedure poll();
854
855 boolean isSuspended();
856 }
857
858 private static abstract class Queue<TKey extends Comparable<TKey>> implements QueueInterface {
859 private Queue<TKey> avlRight = null;
860 private Queue<TKey> avlLeft = null;
861 private int avlHeight = 1;
862
863 private Queue<TKey> iterNext = null;
864 private Queue<TKey> iterPrev = null;
865 private boolean suspended = false;
866
867 private long exclusiveLockProcIdOwner = Long.MIN_VALUE;
868 private int sharedLock = 0;
869
870 private final TKey key;
871 private final int priority;
872
873 public Queue(TKey key) {
874 this(key, 1);
875 }
876
877 public Queue(TKey key, int priority) {
878 this.key = key;
879 this.priority = priority;
880 }
881
882 protected TKey getKey() {
883 return key;
884 }
885
886 protected int getPriority() {
887 return priority;
888 }
889
890
891
892
893 public boolean isSuspended() {
894 return suspended;
895 }
896
897 protected boolean setSuspended(boolean isSuspended) {
898 if (this.suspended == isSuspended) return false;
899 this.suspended = isSuspended;
900 return true;
901 }
902
903
904
905
906 public synchronized boolean isLocked() {
907 return hasExclusiveLock() || sharedLock > 0;
908 }
909
910 public synchronized boolean hasExclusiveLock() {
911 return this.exclusiveLockProcIdOwner != Long.MIN_VALUE;
912 }
913
914 public synchronized boolean trySharedLock() {
915 if (hasExclusiveLock()) return false;
916 sharedLock++;
917 return true;
918 }
919
920 public synchronized void releaseSharedLock() {
921 sharedLock--;
922 }
923
924 protected synchronized boolean isSingleSharedLock() {
925 return sharedLock == 1;
926 }
927
928 public synchronized boolean tryExclusiveLock(long procIdOwner) {
929 assert procIdOwner != Long.MIN_VALUE;
930 if (isLocked()) return false;
931 exclusiveLockProcIdOwner = procIdOwner;
932 return true;
933 }
934
935 public synchronized void releaseExclusiveLock() {
936 exclusiveLockProcIdOwner = Long.MIN_VALUE;
937 }
938
939
940
941 public synchronized boolean isAvailable() {
942 return !hasExclusiveLock() && !isEmpty();
943 }
944
945
946
947
948 public int compareKey(TKey cmpKey) {
949 return key.compareTo(cmpKey);
950 }
951
952 public int compareTo(Queue<TKey> other) {
953 return compareKey(other.key);
954 }
955
956 @Override
957 public String toString() {
958 return String.format("%s(%s)", getClass().getSimpleName(), key);
959 }
960 }
961
962
963
964
965 private static abstract class QueueImpl<TKey extends Comparable<TKey>> extends Queue<TKey> {
966 private final ArrayDeque<Procedure> runnables = new ArrayDeque<Procedure>();
967
968 public QueueImpl(TKey key) {
969 super(key);
970 }
971
972 public QueueImpl(TKey key, int priority) {
973 super(key, priority);
974 }
975
976 public void add(final Procedure proc, final boolean addToFront) {
977 if (addToFront) {
978 addFront(proc);
979 } else {
980 addBack(proc);
981 }
982 }
983
984 protected void addFront(final Procedure proc) {
985 runnables.addFirst(proc);
986 }
987
988 protected void addBack(final Procedure proc) {
989 runnables.addLast(proc);
990 }
991
992 public Procedure peek() {
993 return runnables.peek();
994 }
995
996 @Override
997 public Procedure poll() {
998 return runnables.poll();
999 }
1000
1001 @Override
1002 public boolean isEmpty() {
1003 return runnables.isEmpty();
1004 }
1005
1006 public int size() {
1007 return runnables.size();
1008 }
1009 }
1010
1011 private static class FairQueue<T extends Comparable<T>> {
1012 private final int quantum;
1013
1014 private Queue<T> currentQueue = null;
1015 private Queue<T> queueHead = null;
1016 private int currentQuantum = 0;
1017
1018 public FairQueue() {
1019 this(1);
1020 }
1021
1022 public FairQueue(int quantum) {
1023 this.quantum = quantum;
1024 }
1025
1026 public void add(Queue<T> queue) {
1027 queueHead = IterableList.append(queueHead, queue);
1028 if (currentQueue == null) setNextQueue(queueHead);
1029 }
1030
1031 public void remove(Queue<T> queue) {
1032 Queue<T> nextQueue = queue.iterNext;
1033 queueHead = IterableList.remove(queueHead, queue);
1034 if (currentQueue == queue) {
1035 setNextQueue(queueHead != null ? nextQueue : null);
1036 }
1037 }
1038
1039 public Queue<T> poll() {
1040 if (currentQuantum == 0) {
1041 if (!nextQueue()) {
1042 return null;
1043 }
1044 currentQuantum = calculateQuantum(currentQueue) - 1;
1045 } else {
1046 currentQuantum--;
1047 }
1048
1049
1050 if (!currentQueue.isAvailable()) {
1051 Queue<T> lastQueue = currentQueue;
1052 do {
1053 if (!nextQueue())
1054 return null;
1055 } while (currentQueue != lastQueue && !currentQueue.isAvailable());
1056
1057 currentQuantum = calculateQuantum(currentQueue) - 1;
1058 }
1059 return currentQueue;
1060 }
1061
1062 private boolean nextQueue() {
1063 if (currentQueue == null) return false;
1064 currentQueue = currentQueue.iterNext;
1065 return currentQueue != null;
1066 }
1067
1068 private void setNextQueue(Queue<T> queue) {
1069 currentQueue = queue;
1070 if (queue != null) {
1071 currentQuantum = calculateQuantum(currentQueue);
1072 } else {
1073 currentQuantum = 0;
1074 }
1075 }
1076
1077 private int calculateQuantum(final Queue queue) {
1078 return Math.max(1, queue.getPriority() * quantum);
1079 }
1080 }
1081
1082 private static class AvlTree {
1083 public static <T extends Comparable<T>> Queue<T> get(Queue<T> root, T key) {
1084 while (root != null) {
1085 int cmp = root.compareKey(key);
1086 if (cmp > 0) {
1087 root = root.avlLeft;
1088 } else if (cmp < 0) {
1089 root = root.avlRight;
1090 } else {
1091 return root;
1092 }
1093 }
1094 return null;
1095 }
1096
1097 public static <T extends Comparable<T>> Queue<T> getFirst(Queue<T> root) {
1098 if (root != null) {
1099 while (root.avlLeft != null) {
1100 root = root.avlLeft;
1101 }
1102 }
1103 return root;
1104 }
1105
1106 public static <T extends Comparable<T>> Queue<T> getLast(Queue<T> root) {
1107 if (root != null) {
1108 while (root.avlRight != null) {
1109 root = root.avlRight;
1110 }
1111 }
1112 return root;
1113 }
1114
1115 public static <T extends Comparable<T>> Queue<T> insert(Queue<T> root, Queue<T> node) {
1116 if (root == null) return node;
1117 if (node.compareTo(root) < 0) {
1118 root.avlLeft = insert(root.avlLeft, node);
1119 } else {
1120 root.avlRight = insert(root.avlRight, node);
1121 }
1122 return balance(root);
1123 }
1124
1125 private static <T extends Comparable<T>> Queue<T> removeMin(Queue<T> p) {
1126 if (p.avlLeft == null)
1127 return p.avlRight;
1128 p.avlLeft = removeMin(p.avlLeft);
1129 return balance(p);
1130 }
1131
1132 public static <T extends Comparable<T>> Queue<T> remove(Queue<T> root, T key) {
1133 if (root == null) return null;
1134
1135 int cmp = root.compareKey(key);
1136 if (cmp == 0) {
1137 Queue<T> q = root.avlLeft;
1138 Queue<T> r = root.avlRight;
1139 if (r == null) return q;
1140 Queue<T> min = getFirst(r);
1141 min.avlRight = removeMin(r);
1142 min.avlLeft = q;
1143 return balance(min);
1144 } else if (cmp > 0) {
1145 root.avlLeft = remove(root.avlLeft, key);
1146 } else
1147 root.avlRight = remove(root.avlRight, key);
1148 }
1149 return balance(root);
1150 }
1151
1152 private static <T extends Comparable<T>> Queue<T> balance(Queue<T> p) {
1153 fixHeight(p);
1154 int balance = balanceFactor(p);
1155 if (balance == 2) {
1156 if (balanceFactor(p.avlRight) < 0) {
1157 p.avlRight = rotateRight(p.avlRight);
1158 }
1159 return rotateLeft(p);
1160 } else if (balance == -2) {
1161 if (balanceFactor(p.avlLeft) > 0) {
1162 p.avlLeft = rotateLeft(p.avlLeft);
1163 }
1164 return rotateRight(p);
1165 }
1166 return p;
1167 }
1168
1169 private static <T extends Comparable<T>> Queue<T> rotateRight(Queue<T> p) {
1170 Queue<T> q = p.avlLeft;
1171 p.avlLeft = q.avlRight;
1172 q.avlRight = p;
1173 fixHeight(p);
1174 fixHeight(q);
1175 return q;
1176 }
1177
1178 private static <T extends Comparable<T>> Queue<T> rotateLeft(Queue<T> q) {
1179 Queue<T> p = q.avlRight;
1180 q.avlRight = p.avlLeft;
1181 p.avlLeft = q;
1182 fixHeight(q);
1183 fixHeight(p);
1184 return p;
1185 }
1186
1187 private static <T extends Comparable<T>> void fixHeight(Queue<T> node) {
1188 int heightLeft = height(node.avlLeft);
1189 int heightRight = height(node.avlRight);
1190 node.avlHeight = 1 + Math.max(heightLeft, heightRight);
1191 }
1192
1193 private static <T extends Comparable<T>> int height(Queue<T> node) {
1194 return node != null ? node.avlHeight : 0;
1195 }
1196
1197 private static <T extends Comparable<T>> int balanceFactor(Queue<T> node) {
1198 return height(node.avlRight) - height(node.avlLeft);
1199 }
1200 }
1201
1202 private static class IterableList {
1203 public static <T extends Comparable<T>> Queue<T> prepend(Queue<T> head, Queue<T> node) {
1204 assert !isLinked(node) : node + " is already linked";
1205 if (head != null) {
1206 Queue<T> tail = head.iterPrev;
1207 tail.iterNext = node;
1208 head.iterPrev = node;
1209 node.iterNext = head;
1210 node.iterPrev = tail;
1211 } else {
1212 node.iterNext = node;
1213 node.iterPrev = node;
1214 }
1215 return node;
1216 }
1217
1218 public static <T extends Comparable<T>> Queue<T> append(Queue<T> head, Queue<T> node) {
1219 assert !isLinked(node) : node + " is already linked";
1220 if (head != null) {
1221 Queue<T> tail = head.iterPrev;
1222 tail.iterNext = node;
1223 node.iterNext = head;
1224 node.iterPrev = tail;
1225 head.iterPrev = node;
1226 return head;
1227 }
1228 node.iterNext = node;
1229 node.iterPrev = node;
1230 return node;
1231 }
1232
1233 public static <T extends Comparable<T>> Queue<T> appendList(Queue<T> head, Queue<T> otherHead) {
1234 if (head == null) return otherHead;
1235 if (otherHead == null) return head;
1236
1237 Queue<T> tail = head.iterPrev;
1238 Queue<T> otherTail = otherHead.iterPrev;
1239 tail.iterNext = otherHead;
1240 otherHead.iterPrev = tail;
1241 otherTail.iterNext = head;
1242 head.iterPrev = otherTail;
1243 return head;
1244 }
1245
1246 private static <T extends Comparable<T>> Queue<T> remove(Queue<T> head, Queue<T> node) {
1247 assert isLinked(node) : node + " is not linked";
1248 if (node != node.iterNext) {
1249 node.iterPrev.iterNext = node.iterNext;
1250 node.iterNext.iterPrev = node.iterPrev;
1251 head = (head == node) ? node.iterNext : head;
1252 } else {
1253 head = null;
1254 }
1255 node.iterNext = null;
1256 node.iterPrev = null;
1257 return head;
1258 }
1259
1260 private static <T extends Comparable<T>> boolean isLinked(Queue<T> node) {
1261 return node.iterPrev != null && node.iterNext != null;
1262 }
1263 }
1264 }