View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.master.procedure;
20  
21  import java.io.IOException;
22  import java.util.ArrayDeque;
23  import java.util.concurrent.locks.Condition;
24  import java.util.concurrent.locks.ReentrantLock;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.ServerName;
30  import org.apache.hadoop.hbase.TableExistsException;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.TableNotFoundException;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.classification.InterfaceStability;
35  import org.apache.hadoop.hbase.master.TableLockManager;
36  import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
37  import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
38  import org.apache.hadoop.hbase.procedure2.Procedure;
39  import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet;
40  
41  /**
42   * ProcedureRunnableSet for the Master Procedures.
43   * This RunnableSet tries to provide to the ProcedureExecutor procedures
44   * that can be executed without having to wait on a lock.
45   * Most of the master operations can be executed concurrently, if they
46   * are operating on different tables (e.g. two create table can be performed
47   * at the same, time assuming table A and table B) or against two different servers; say
48   * two servers that crashed at about the same time.
49   *
50   * <p>Each procedure should implement an interface providing information for this queue.
51   * for example table related procedures should implement TableProcedureInterface.
52   * each procedure will be pushed in its own queue, and based on the operation type
53   * we may take smarter decision. e.g. we can abort all the operations preceding
54   * a delete table, or similar.
55   */
56  @InterfaceAudience.Private
57  @InterfaceStability.Evolving
58  public class MasterProcedureScheduler implements ProcedureRunnableSet {
59    private static final Log LOG = LogFactory.getLog(MasterProcedureScheduler.class);
60  
61    private final TableLockManager lockManager;
62    private final ReentrantLock schedLock = new ReentrantLock();
63    private final Condition schedWaitCond = schedLock.newCondition();
64  
65    private final FairQueue<ServerName> serverRunQueue = new FairQueue<ServerName>();
66    private final FairQueue<TableName> tableRunQueue = new FairQueue<TableName>();
67    private int queueSize = 0;
68  
69    private final Object[] serverBuckets = new Object[128];
70    private Queue<String> namespaceMap = null;
71    private Queue<TableName> tableMap = null;
72  
73    private final int metaTablePriority;
74    private final int userTablePriority;
75    private final int sysTablePriority;
76  
77    // TODO: metrics
78    private long pollCalls = 0;
79    private long nullPollCalls = 0;
80  
81    public MasterProcedureScheduler(final Configuration conf, final TableLockManager lockManager) {
82      this.lockManager = lockManager;
83  
84      // TODO: should this be part of the HTD?
85      metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3);
86      sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2);
87      userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1);
88    }
89  
90    @Override
91    public void addFront(Procedure proc) {
92      doAdd(proc, true);
93    }
94  
95    @Override
96    public void addBack(Procedure proc) {
97      doAdd(proc, false);
98    }
99  
100   @Override
101   public void yield(final Procedure proc) {
102     doAdd(proc, isTableProcedure(proc));
103   }
104 
105   private void doAdd(final Procedure proc, final boolean addFront) {
106     schedLock.lock();
107     try {
108       if (isTableProcedure(proc)) {
109         doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
110       } else if (isServerProcedure(proc)) {
111         doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront);
112       } else {
113         // TODO: at the moment we only have Table and Server procedures
114         // if you are implementing a non-table/non-server procedure, you have two options: create
115         // a group for all the non-table/non-server procedures or try to find a key for your
116         // non-table/non-server procedures and implement something similar to the TableRunQueue.
117         throw new UnsupportedOperationException(
118           "RQs for non-table/non-server procedures are not implemented yet");
119       }
120       schedWaitCond.signal();
121     } finally {
122       schedLock.unlock();
123     }
124   }
125 
126   private <T extends Comparable<T>> void doAdd(final FairQueue<T> fairq,
127       final Queue<T> queue, final Procedure proc, final boolean addFront) {
128     queue.add(proc, addFront);
129     if (!(queue.isSuspended() || queue.hasExclusiveLock())) {
130       if (queue.size() == 1 && !IterableList.isLinked(queue)) {
131         fairq.add(queue);
132       }
133       queueSize++;
134     }
135   }
136 
137   @Override
138   public Procedure poll() {
139     return poll(-1);
140   }
141 
142   @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
143   Procedure poll(long waitNsec) {
144     Procedure pollResult = null;
145     schedLock.lock();
146     try {
147       if (queueSize == 0) {
148         if (waitNsec < 0) {
149           schedWaitCond.await();
150         } else {
151           schedWaitCond.awaitNanos(waitNsec);
152         }
153         if (queueSize == 0) {
154           return null;
155         }
156       }
157 
158       // For now, let server handling have precedence over table handling; presumption is that it
159       // is more important handling crashed servers than it is running the
160       // enabling/disabling tables, etc.
161       pollResult = doPoll(serverRunQueue);
162       if (pollResult == null) {
163         pollResult = doPoll(tableRunQueue);
164       }
165 
166       // update metrics
167       pollCalls++;
168       nullPollCalls += (pollResult == null) ? 1 : 0;
169     } catch (InterruptedException e) {
170       Thread.currentThread().interrupt();
171     } finally {
172       schedLock.unlock();
173     }
174     return pollResult;
175   }
176 
177   private <T extends Comparable<T>> Procedure doPoll(final FairQueue<T> fairq) {
178     Queue<T> rq = fairq.poll();
179     if (rq == null || !rq.isAvailable()) {
180       return null;
181     }
182 
183     assert !rq.isSuspended() : "rq=" + rq + " is suspended";
184     Procedure pollResult = rq.poll();
185     this.queueSize--;
186     if (rq.isEmpty() || rq.requireExclusiveLock(pollResult)) {
187       removeFromRunQueue(fairq, rq);
188     }
189     return pollResult;
190   }
191 
192   @Override
193   public void clear() {
194     // NOTE: USED ONLY FOR TESTING
195     schedLock.lock();
196     try {
197       // Remove Servers
198       for (int i = 0; i < serverBuckets.length; ++i) {
199         clear((ServerQueue)serverBuckets[i], serverRunQueue);
200         serverBuckets[i] = null;
201       }
202 
203       // Remove Tables
204       clear(tableMap, tableRunQueue);
205       tableMap = null;
206 
207       assert queueSize == 0 : "expected queue size to be 0, got " + queueSize;
208     } finally {
209       schedLock.unlock();
210     }
211   }
212 
213   private <T extends Comparable<T>> void clear(Queue<T> treeMap, FairQueue<T> fairq) {
214     while (treeMap != null) {
215       Queue<T> node = AvlTree.getFirst(treeMap);
216       assert !node.isSuspended() : "can't clear suspended " + node.getKey();
217       treeMap = AvlTree.remove(treeMap, node.getKey());
218       removeFromRunQueue(fairq, node);
219     }
220   }
221 
222   @Override
223   public void signalAll() {
224     schedLock.lock();
225     try {
226       schedWaitCond.signalAll();
227     } finally {
228       schedLock.unlock();
229     }
230   }
231 
232   @Override
233   public int size() {
234     schedLock.lock();
235     try {
236       return queueSize;
237     } finally {
238       schedLock.unlock();
239     }
240   }
241 
242   @Override
243   public void completionCleanup(Procedure proc) {
244     if (proc instanceof TableProcedureInterface) {
245       TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
246       boolean tableDeleted;
247       if (proc.hasException()) {
248         IOException procEx =  proc.getException().unwrapRemoteException();
249         if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
250           // create failed because the table already exist
251           tableDeleted = !(procEx instanceof TableExistsException);
252         } else {
253           // the operation failed because the table does not exist
254           tableDeleted = (procEx instanceof TableNotFoundException);
255         }
256       } else {
257         // the table was deleted
258         tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
259       }
260       if (tableDeleted) {
261         markTableAsDeleted(iProcTable.getTableName());
262         return;
263       }
264     } else {
265       // No cleanup for ServerProcedureInterface types, yet.
266       return;
267     }
268   }
269 
270   private <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue) {
271     if (IterableList.isLinked(queue)) return;
272     if (!queue.isEmpty())  {
273       fairq.add(queue);
274       queueSize += queue.size();
275     }
276   }
277 
278   private <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, Queue<T> queue) {
279     if (!IterableList.isLinked(queue)) return;
280     fairq.remove(queue);
281     queueSize -= queue.size();
282   }
283 
284   // ============================================================================
285   //  TODO: Metrics
286   // ============================================================================
287   public long getPollCalls() {
288     return pollCalls;
289   }
290 
291   public long getNullPollCalls() {
292     return nullPollCalls;
293   }
294 
295   // ============================================================================
296   //  Event Helpers
297   // ============================================================================
298   public boolean waitEvent(ProcedureEvent event, Procedure procedure) {
299     return waitEvent(event, procedure, false);
300   }
301 
302   public boolean waitEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) {
303     synchronized (event) {
304       if (event.isReady()) {
305         return false;
306       }
307 
308       // TODO: Suspend single procedure not implemented yet, fallback to suspending the queue
309       if (!suspendQueue) suspendQueue = true;
310 
311       if (isTableProcedure(procedure)) {
312         suspendTableQueue(event, getTableName(procedure));
313       } else if (isServerProcedure(procedure)) {
314         suspendServerQueue(event, getServerName(procedure));
315       } else {
316         // TODO: at the moment we only have Table and Server procedures
317         // if you are implementing a non-table/non-server procedure, you have two options: create
318         // a group for all the non-table/non-server procedures or try to find a key for your
319         // non-table/non-server procedures and implement something similar to the TableRunQueue.
320         throw new UnsupportedOperationException(
321           "RQs for non-table/non-server procedures are not implemented yet");
322       }
323     }
324     return true;
325   }
326 
327   private void suspendTableQueue(ProcedureEvent event, TableName tableName) {
328     schedLock.lock();
329     try {
330       TableQueue queue = getTableQueue(tableName);
331       if (!queue.setSuspended(true)) return;
332 
333       if (LOG.isDebugEnabled()) {
334         LOG.debug("Suspend table queue " + tableName);
335       }
336       removeFromRunQueue(tableRunQueue, queue);
337       event.suspendTableQueue(queue);
338     } finally {
339       schedLock.unlock();
340     }
341   }
342 
343   private void suspendServerQueue(ProcedureEvent event, ServerName serverName) {
344     schedLock.lock();
345     try {
346       // TODO: This will change once we have the new AM
347       ServerQueue queue = getServerQueue(serverName);
348       if (!queue.setSuspended(true)) return;
349 
350       if (LOG.isDebugEnabled()) {
351         LOG.debug("Suspend server queue " + serverName);
352       }
353       removeFromRunQueue(serverRunQueue, queue);
354       event.suspendServerQueue(queue);
355     } finally {
356       schedLock.unlock();
357     }
358   }
359 
360   public void suspend(ProcedureEvent event) {
361     synchronized (event) {
362       event.setReady(false);
363       if (LOG.isDebugEnabled()) {
364         LOG.debug("Suspend event " + event);
365       }
366     }
367   }
368 
369   public void wake(ProcedureEvent event) {
370     synchronized (event) {
371       event.setReady(true);
372       if (LOG.isDebugEnabled()) {
373         LOG.debug("Wake event " + event);
374       }
375 
376       schedLock.lock();
377       try {
378         while (event.hasWaitingTables()) {
379           Queue<TableName> queue = event.popWaitingTable();
380           addToRunQueue(tableRunQueue, queue);
381         }
382         // TODO: This will change once we have the new AM
383         while (event.hasWaitingServers()) {
384           Queue<ServerName> queue = event.popWaitingServer();
385           addToRunQueue(serverRunQueue, queue);
386         }
387 
388         if (queueSize > 1) {
389           schedWaitCond.signalAll();
390         } else if (queueSize > 0) {
391           schedWaitCond.signal();
392         }
393       } finally {
394         schedLock.unlock();
395       }
396     }
397   }
398 
399   public static class ProcedureEvent {
400     private final String description;
401 
402     private Queue<ServerName> waitingServers = null;
403     private Queue<TableName> waitingTables = null;
404     private boolean ready = false;
405 
406     public ProcedureEvent(String description) {
407       this.description = description;
408     }
409 
410     public synchronized boolean isReady() {
411       return ready;
412     }
413 
414     private synchronized void setReady(boolean isReady) {
415       this.ready = isReady;
416     }
417 
418     private void suspendTableQueue(Queue<TableName> queue) {
419       waitingTables = IterableList.append(waitingTables, queue);
420     }
421 
422     private void suspendServerQueue(Queue<ServerName> queue) {
423       waitingServers = IterableList.append(waitingServers, queue);
424     }
425 
426     private boolean hasWaitingTables() {
427       return waitingTables != null;
428     }
429 
430     private Queue<TableName> popWaitingTable() {
431       Queue<TableName> node = waitingTables;
432       waitingTables = IterableList.remove(waitingTables, node);
433       node.setSuspended(false);
434       return node;
435     }
436 
437     private boolean hasWaitingServers() {
438       return waitingServers != null;
439     }
440 
441     private Queue<ServerName> popWaitingServer() {
442       Queue<ServerName> node = waitingServers;
443       waitingServers = IterableList.remove(waitingServers, node);
444       node.setSuspended(false);
445       return node;
446     }
447 
448     @Override
449     public String toString() {
450       return String.format("ProcedureEvent(%s)", description);
451     }
452   }
453 
454   // ============================================================================
455   //  Table Queue Lookup Helpers
456   // ============================================================================
457   private TableQueue getTableQueueWithLock(TableName tableName) {
458     schedLock.lock();
459     try {
460       return getTableQueue(tableName);
461     } finally {
462       schedLock.unlock();
463     }
464   }
465 
466   private TableQueue getTableQueue(TableName tableName) {
467     Queue<TableName> node = AvlTree.get(tableMap, tableName);
468     if (node != null) return (TableQueue)node;
469 
470     node = new TableQueue(tableName, getTablePriority(tableName));
471     tableMap = AvlTree.insert(tableMap, node);
472     return (TableQueue)node;
473   }
474 
475   private void removeTableQueue(TableName tableName) {
476     tableMap = AvlTree.remove(tableMap, tableName);
477   }
478 
479   private int getTablePriority(TableName tableName) {
480     if (tableName.equals(TableName.META_TABLE_NAME)) {
481       return metaTablePriority;
482     } else if (tableName.isSystemTable()) {
483       return sysTablePriority;
484     }
485     return userTablePriority;
486   }
487 
488   private static boolean isTableProcedure(Procedure proc) {
489     return proc instanceof TableProcedureInterface;
490   }
491 
492   private static TableName getTableName(Procedure proc) {
493     return ((TableProcedureInterface)proc).getTableName();
494   }
495 
496   // ============================================================================
497   //  Server Queue Lookup Helpers
498   // ============================================================================
499   private ServerQueue getServerQueueWithLock(ServerName serverName) {
500     schedLock.lock();
501     try {
502       return getServerQueue(serverName);
503     } finally {
504       schedLock.unlock();
505     }
506   }
507 
508   private ServerQueue getServerQueue(ServerName serverName) {
509     int index = getBucketIndex(serverBuckets, serverName.hashCode());
510     Queue<ServerName> root = getTreeRoot(serverBuckets, index);
511     Queue<ServerName> node = AvlTree.get(root, serverName);
512     if (node != null) return (ServerQueue)node;
513 
514     node = new ServerQueue(serverName);
515     serverBuckets[index] = AvlTree.insert(root, node);
516     return (ServerQueue)node;
517   }
518 
519   private void removeServerQueue(ServerName serverName) {
520     int index = getBucketIndex(serverBuckets, serverName.hashCode());
521     serverBuckets[index] = AvlTree.remove((ServerQueue)serverBuckets[index], serverName);
522   }
523 
524   @SuppressWarnings("unchecked")
525   private static <T extends Comparable<T>> Queue<T> getTreeRoot(Object[] buckets, int index) {
526     return (Queue<T>) buckets[index];
527   }
528 
529   private static int getBucketIndex(Object[] buckets, int hashCode) {
530     return Math.abs(hashCode) % buckets.length;
531   }
532 
533   private static boolean isServerProcedure(Procedure proc) {
534     return proc instanceof ServerProcedureInterface;
535   }
536 
537   private static ServerName getServerName(Procedure proc) {
538     return ((ServerProcedureInterface)proc).getServerName();
539   }
540 
541   // ============================================================================
542   //  Table and Server Queue Implementation
543   // ============================================================================
544   public static class ServerQueue extends QueueImpl<ServerName> {
545     public ServerQueue(ServerName serverName) {
546       super(serverName);
547     }
548 
549     public boolean requireExclusiveLock(Procedure proc) {
550       ServerProcedureInterface spi = (ServerProcedureInterface)proc;
551       switch (spi.getServerOperationType()) {
552         case CRASH_HANDLER:
553           return true;
554         default:
555           break;
556       }
557       throw new UnsupportedOperationException("unexpected type " + spi.getServerOperationType());
558     }
559   }
560 
561   public static class TableQueue extends QueueImpl<TableName> {
562     private TableLock tableLock = null;
563 
564     public TableQueue(TableName tableName, int priority) {
565       super(tableName, priority);
566     }
567 
568     // TODO: We can abort pending/in-progress operation if the new call is
569     //       something like drop table. We can Override addBack(),
570     //       check the type and abort all the in-flight procedurs.
571     private boolean canAbortPendingOperations(Procedure proc) {
572       TableProcedureInterface tpi = (TableProcedureInterface)proc;
573       switch (tpi.getTableOperationType()) {
574         case DELETE:
575           return true;
576         default:
577           return false;
578       }
579     }
580 
581     public boolean requireExclusiveLock(Procedure proc) {
582       TableProcedureInterface tpi = (TableProcedureInterface)proc;
583       switch (tpi.getTableOperationType()) {
584         case CREATE:
585         case DELETE:
586         case DISABLE:
587         case EDIT:
588         case ENABLE:
589           return true;
590         case READ:
591           return false;
592         default:
593           break;
594       }
595       throw new UnsupportedOperationException("unexpected type " + tpi.getTableOperationType());
596     }
597 
598     private synchronized boolean trySharedLock(final TableLockManager lockManager,
599         final String purpose) {
600       if (hasExclusiveLock()) return false;
601 
602       // Take zk-read-lock
603       TableName tableName = getKey();
604       tableLock = lockManager.readLock(tableName, purpose);
605       try {
606         tableLock.acquire();
607       } catch (IOException e) {
608         LOG.error("failed acquire read lock on " + tableName, e);
609         tableLock = null;
610         return false;
611       }
612 
613       trySharedLock();
614       return true;
615     }
616 
617     private synchronized void releaseSharedLock(final TableLockManager lockManager) {
618       releaseTableLock(lockManager, isSingleSharedLock());
619       releaseSharedLock();
620     }
621 
622     private synchronized boolean tryZkExclusiveLock(final TableLockManager lockManager,
623         final String purpose) {
624       // Take zk-write-lock
625       TableName tableName = getKey();
626       tableLock = lockManager.writeLock(tableName, purpose);
627       try {
628         tableLock.acquire();
629       } catch (IOException e) {
630         LOG.error("failed acquire write lock on " + tableName, e);
631         tableLock = null;
632         return false;
633       }
634       return true;
635     }
636 
637     private synchronized void releaseZkExclusiveLock(final TableLockManager lockManager) {
638       releaseTableLock(lockManager, true);
639     }
640 
641     private void releaseTableLock(final TableLockManager lockManager, boolean reset) {
642       for (int i = 0; i < 3; ++i) {
643         try {
644           tableLock.release();
645           if (reset) {
646             tableLock = null;
647           }
648           break;
649         } catch (IOException e) {
650           LOG.warn("Could not release the table write-lock", e);
651         }
652       }
653     }
654   }
655 
656   // ============================================================================
657   //  Locking Helpers
658   // ============================================================================
659   /**
660    * Try to acquire the exclusive lock on the specified table.
661    * other operations in the table-queue will be executed after the lock is released.
662    * @param procedure the procedure trying to acquire the lock
663    * @param table Table to lock
664    * @return true if we were able to acquire the lock on the table, otherwise false.
665    */
666   public boolean tryAcquireTableExclusiveLock(final Procedure procedure, final TableName table) {
667     schedLock.lock();
668     TableQueue queue = getTableQueue(table);
669 
670     if (!queue.tryExclusiveLock(procedure.getProcId())) {
671       schedLock.unlock();
672       return false;
673     }
674 
675     removeFromRunQueue(tableRunQueue, queue);
676     schedLock.unlock();
677 
678     // Zk lock is expensive...
679     boolean hasXLock = queue.tryZkExclusiveLock(lockManager, procedure.toString());
680     if (!hasXLock) {
681       schedLock.lock();
682       queue.releaseExclusiveLock();
683       addToRunQueue(tableRunQueue, queue);
684       schedLock.unlock();
685     }
686     return hasXLock;
687   }
688 
689   /**
690    * Release the exclusive lock taken with tryAcquireTableWrite()
691    * @param procedure the procedure releasing the lock
692    * @param table the name of the table that has the exclusive lock
693    */
694   public void releaseTableExclusiveLock(final Procedure procedure, final TableName table) {
695     schedLock.lock();
696     TableQueue queue = getTableQueue(table);
697     schedLock.unlock();
698 
699     // Zk lock is expensive...
700     queue.releaseZkExclusiveLock(lockManager);
701 
702     schedLock.lock();
703     queue.releaseExclusiveLock();
704     addToRunQueue(tableRunQueue, queue);
705     schedLock.unlock();
706   }
707 
708   /**
709    * Try to acquire the shared lock on the specified table.
710    * other "read" operations in the table-queue may be executed concurrently,
711    * @param procedure the procedure trying to acquire the lock
712    * @param table Table to lock
713    * @return true if we were able to acquire the lock on the table, otherwise false.
714    */
715   public boolean tryAcquireTableSharedLock(final Procedure procedure, final TableName table) {
716     return tryAcquireTableQueueSharedLock(procedure, table) != null;
717   }
718 
719   private TableQueue tryAcquireTableQueueSharedLock(final Procedure procedure,
720       final TableName table) {
721     TableQueue queue = getTableQueueWithLock(table);
722     if (!queue.trySharedLock(lockManager, procedure.toString())) {
723       return null;
724     }
725     return queue;
726   }
727 
728   /**
729    * Release the shared lock taken with tryAcquireTableRead()
730    * @param procedure the procedure releasing the lock
731    * @param table the name of the table that has the shared lock
732    */
733   public void releaseTableSharedLock(final Procedure procedure, final TableName table) {
734     getTableQueueWithLock(table).releaseSharedLock(lockManager);
735   }
736 
737   /**
738    * Tries to remove the queue and the table-lock of the specified table.
739    * If there are new operations pending (e.g. a new create),
740    * the remove will not be performed.
741    * @param table the name of the table that should be marked as deleted
742    * @return true if deletion succeeded, false otherwise meaning that there are
743    *     other new operations pending for that table (e.g. a new create).
744    */
745   protected boolean markTableAsDeleted(final TableName table) {
746     final ReentrantLock l = schedLock;
747     l.lock();
748     try {
749       TableQueue queue = getTableQueue(table);
750       if (queue == null) return true;
751 
752       if (queue.isEmpty() && queue.tryExclusiveLock(0)) {
753         // remove the table from the run-queue and the map
754         if (IterableList.isLinked(queue)) {
755           tableRunQueue.remove(queue);
756         }
757 
758         // Remove the table lock
759         try {
760           lockManager.tableDeleted(table);
761         } catch (IOException e) {
762           LOG.warn("Received exception from TableLockManager.tableDeleted:", e); //not critical
763         }
764 
765         removeTableQueue(table);
766       } else {
767         // TODO: If there are no create, we can drop all the other ops
768         return false;
769       }
770     } finally {
771       l.unlock();
772     }
773     return true;
774   }
775 
776   // ============================================================================
777   //  Server Locking Helpers
778   // ============================================================================
779   /**
780    * Try to acquire the exclusive lock on the specified server.
781    * @see #releaseServerExclusiveLock(Procedure,ServerName)
782    * @param procedure the procedure trying to acquire the lock
783    * @param serverName Server to lock
784    * @return true if we were able to acquire the lock on the server, otherwise false.
785    */
786   public boolean tryAcquireServerExclusiveLock(final Procedure procedure,
787       final ServerName serverName) {
788     schedLock.lock();
789     try {
790       ServerQueue queue = getServerQueue(serverName);
791       if (queue.tryExclusiveLock(procedure.getProcId())) {
792         removeFromRunQueue(serverRunQueue, queue);
793         return true;
794       }
795     } finally {
796       schedLock.unlock();
797     }
798     return false;
799   }
800 
801   /**
802    * Release the exclusive lock
803    * @see #tryAcquireServerExclusiveLock(Procedure,ServerName)
804    * @param procedure the procedure releasing the lock
805    * @param serverName the server that has the exclusive lock
806    */
807   public void releaseServerExclusiveLock(final Procedure procedure,
808       final ServerName serverName) {
809     schedLock.lock();
810     try {
811       ServerQueue queue = getServerQueue(serverName);
812       queue.releaseExclusiveLock();
813       addToRunQueue(serverRunQueue, queue);
814     } finally {
815       schedLock.unlock();
816     }
817   }
818 
819   /**
820    * Try to acquire the shared lock on the specified server.
821    * @see #releaseServerSharedLock(Procedure,ServerName)
822    * @param procedure the procedure releasing the lock
823    * @param serverName Server to lock
824    * @return true if we were able to acquire the lock on the server, otherwise false.
825    */
826   public boolean tryAcquireServerSharedLock(final Procedure procedure,
827       final ServerName serverName) {
828     return getServerQueueWithLock(serverName).trySharedLock();
829   }
830 
831   /**
832    * Release the shared lock taken
833    * @see #tryAcquireServerSharedLock(Procedure,ServerName)
834    * @param procedure the procedure releasing the lock
835    * @param serverName the server that has the shared lock
836    */
837   public void releaseServerSharedLock(final Procedure procedure,
838       final ServerName serverName) {
839     getServerQueueWithLock(serverName).releaseSharedLock();
840   }
841 
842   // ============================================================================
843   //  Generic Helpers
844   // ============================================================================
845   private static interface QueueInterface {
846     boolean isAvailable();
847     boolean isEmpty();
848     int size();
849 
850     void add(Procedure proc, boolean addFront);
851     boolean requireExclusiveLock(Procedure proc);
852     Procedure peek();
853     Procedure poll();
854 
855     boolean isSuspended();
856   }
857 
858   private static abstract class Queue<TKey extends Comparable<TKey>> implements QueueInterface {
859     private Queue<TKey> avlRight = null;
860     private Queue<TKey> avlLeft = null;
861     private int avlHeight = 1;
862 
863     private Queue<TKey> iterNext = null;
864     private Queue<TKey> iterPrev = null;
865     private boolean suspended = false;
866 
867     private long exclusiveLockProcIdOwner = Long.MIN_VALUE;
868     private int sharedLock = 0;
869 
870     private final TKey key;
871     private final int priority;
872 
873     public Queue(TKey key) {
874       this(key, 1);
875     }
876 
877     public Queue(TKey key, int priority) {
878       this.key = key;
879       this.priority = priority;
880     }
881 
882     protected TKey getKey() {
883       return key;
884     }
885 
886     protected int getPriority() {
887       return priority;
888     }
889 
890     /**
891      * True if the queue is not in the run-queue and it is owned by an event.
892      */
893     public boolean isSuspended() {
894       return suspended;
895     }
896 
897     protected boolean setSuspended(boolean isSuspended) {
898       if (this.suspended == isSuspended) return false;
899       this.suspended = isSuspended;
900       return true;
901     }
902 
903     // ======================================================================
904     //  Read/Write Locking helpers
905     // ======================================================================
906     public synchronized boolean isLocked() {
907       return hasExclusiveLock() || sharedLock > 0;
908     }
909 
910     public synchronized boolean hasExclusiveLock() {
911       return this.exclusiveLockProcIdOwner != Long.MIN_VALUE;
912     }
913 
914     public synchronized boolean trySharedLock() {
915       if (hasExclusiveLock()) return false;
916       sharedLock++;
917       return true;
918     }
919 
920     public synchronized void releaseSharedLock() {
921       sharedLock--;
922     }
923 
924     protected synchronized boolean isSingleSharedLock() {
925       return sharedLock == 1;
926     }
927 
928     public synchronized boolean tryExclusiveLock(long procIdOwner) {
929       assert procIdOwner != Long.MIN_VALUE;
930       if (isLocked()) return false;
931       exclusiveLockProcIdOwner = procIdOwner;
932       return true;
933     }
934 
935     public synchronized void releaseExclusiveLock() {
936       exclusiveLockProcIdOwner = Long.MIN_VALUE;
937     }
938 
939     // This should go away when we have the new AM and its events
940     // and we move xlock to the lock-event-queue.
941     public synchronized boolean isAvailable() {
942       return !hasExclusiveLock() && !isEmpty();
943     }
944 
945     // ======================================================================
946     //  Generic Helpers
947     // ======================================================================
948     public int compareKey(TKey cmpKey) {
949       return key.compareTo(cmpKey);
950     }
951 
952     public int compareTo(Queue<TKey> other) {
953       return compareKey(other.key);
954     }
955 
956     @Override
957     public String toString() {
958       return String.format("%s(%s)", getClass().getSimpleName(), key);
959     }
960   }
961 
962   // ======================================================================
963   //  Helper Data Structures
964   // ======================================================================
965   private static abstract class QueueImpl<TKey extends Comparable<TKey>> extends Queue<TKey> {
966     private final ArrayDeque<Procedure> runnables = new ArrayDeque<Procedure>();
967 
968     public QueueImpl(TKey key) {
969       super(key);
970     }
971 
972     public QueueImpl(TKey key, int priority) {
973       super(key, priority);
974     }
975 
976     public void add(final Procedure proc, final boolean addToFront) {
977       if (addToFront) {
978         addFront(proc);
979       } else {
980         addBack(proc);
981       }
982     }
983 
984     protected void addFront(final Procedure proc) {
985       runnables.addFirst(proc);
986     }
987 
988     protected void addBack(final Procedure proc) {
989       runnables.addLast(proc);
990     }
991 
992     public Procedure peek() {
993       return runnables.peek();
994     }
995 
996     @Override
997     public Procedure poll() {
998       return runnables.poll();
999     }
1000 
1001     @Override
1002     public boolean isEmpty() {
1003       return runnables.isEmpty();
1004     }
1005 
1006     public int size() {
1007       return runnables.size();
1008     }
1009   }
1010 
1011   private static class FairQueue<T extends Comparable<T>> {
1012     private final int quantum;
1013 
1014     private Queue<T> currentQueue = null;
1015     private Queue<T> queueHead = null;
1016     private int currentQuantum = 0;
1017 
1018     public FairQueue() {
1019       this(1);
1020     }
1021 
1022     public FairQueue(int quantum) {
1023       this.quantum = quantum;
1024     }
1025 
1026     public void add(Queue<T> queue) {
1027       queueHead = IterableList.append(queueHead, queue);
1028       if (currentQueue == null) setNextQueue(queueHead);
1029     }
1030 
1031     public void remove(Queue<T> queue) {
1032       Queue<T> nextQueue = queue.iterNext;
1033       queueHead = IterableList.remove(queueHead, queue);
1034       if (currentQueue == queue) {
1035         setNextQueue(queueHead != null ? nextQueue : null);
1036       }
1037     }
1038 
1039     public Queue<T> poll() {
1040       if (currentQuantum == 0) {
1041         if (!nextQueue()) {
1042           return null; // nothing here
1043         }
1044         currentQuantum = calculateQuantum(currentQueue) - 1;
1045       } else {
1046         currentQuantum--;
1047       }
1048 
1049       // This should go away when we have the new AM and its events
1050       if (!currentQueue.isAvailable()) {
1051         Queue<T> lastQueue = currentQueue;
1052         do {
1053           if (!nextQueue())
1054             return null;
1055         } while (currentQueue != lastQueue && !currentQueue.isAvailable());
1056 
1057         currentQuantum = calculateQuantum(currentQueue) - 1;
1058       }
1059       return currentQueue;
1060     }
1061 
1062     private boolean nextQueue() {
1063       if (currentQueue == null) return false;
1064       currentQueue = currentQueue.iterNext;
1065       return currentQueue != null;
1066     }
1067 
1068     private void setNextQueue(Queue<T> queue) {
1069       currentQueue = queue;
1070       if (queue != null) {
1071         currentQuantum = calculateQuantum(currentQueue);
1072       } else {
1073         currentQuantum = 0;
1074       }
1075     }
1076 
1077     private int calculateQuantum(final Queue queue) {
1078       return Math.max(1, queue.getPriority() * quantum); // TODO
1079     }
1080   }
1081 
1082   private static class AvlTree {
1083     public static <T extends Comparable<T>> Queue<T> get(Queue<T> root, T key) {
1084       while (root != null) {
1085         int cmp = root.compareKey(key);
1086         if (cmp > 0) {
1087           root = root.avlLeft;
1088         } else if (cmp < 0) {
1089           root = root.avlRight;
1090         } else {
1091           return root;
1092         }
1093       }
1094       return null;
1095     }
1096 
1097     public static <T extends Comparable<T>> Queue<T> getFirst(Queue<T> root) {
1098       if (root != null) {
1099         while (root.avlLeft != null) {
1100           root = root.avlLeft;
1101         }
1102       }
1103       return root;
1104     }
1105 
1106     public static <T extends Comparable<T>> Queue<T> getLast(Queue<T> root) {
1107       if (root != null) {
1108         while (root.avlRight != null) {
1109           root = root.avlRight;
1110         }
1111       }
1112       return root;
1113     }
1114 
1115     public static <T extends Comparable<T>> Queue<T> insert(Queue<T> root, Queue<T> node) {
1116       if (root == null) return node;
1117       if (node.compareTo(root) < 0) {
1118         root.avlLeft = insert(root.avlLeft, node);
1119       } else {
1120         root.avlRight = insert(root.avlRight, node);
1121       }
1122       return balance(root);
1123     }
1124 
1125     private static <T extends Comparable<T>> Queue<T> removeMin(Queue<T> p) {
1126       if (p.avlLeft == null)
1127         return p.avlRight;
1128       p.avlLeft = removeMin(p.avlLeft);
1129       return balance(p);
1130     }
1131 
1132     public static <T extends Comparable<T>> Queue<T> remove(Queue<T> root, T key) {
1133       if (root == null) return null;
1134 
1135       int cmp = root.compareKey(key);
1136       if (cmp == 0) {
1137         Queue<T> q = root.avlLeft;
1138         Queue<T> r = root.avlRight;
1139         if (r == null) return q;
1140         Queue<T> min = getFirst(r);
1141         min.avlRight = removeMin(r);
1142         min.avlLeft = q;
1143         return balance(min);
1144       } else if (cmp > 0) {
1145         root.avlLeft = remove(root.avlLeft, key);
1146       } else /* if (cmp < 0) */ {
1147         root.avlRight = remove(root.avlRight, key);
1148       }
1149       return balance(root);
1150     }
1151 
1152     private static <T extends Comparable<T>> Queue<T> balance(Queue<T> p) {
1153       fixHeight(p);
1154       int balance = balanceFactor(p);
1155       if (balance == 2) {
1156         if (balanceFactor(p.avlRight) < 0) {
1157           p.avlRight = rotateRight(p.avlRight);
1158         }
1159         return rotateLeft(p);
1160       } else if (balance == -2) {
1161         if (balanceFactor(p.avlLeft) > 0) {
1162           p.avlLeft = rotateLeft(p.avlLeft);
1163         }
1164         return rotateRight(p);
1165       }
1166       return p;
1167     }
1168 
1169     private static <T extends Comparable<T>> Queue<T> rotateRight(Queue<T> p) {
1170       Queue<T> q = p.avlLeft;
1171       p.avlLeft = q.avlRight;
1172       q.avlRight = p;
1173       fixHeight(p);
1174       fixHeight(q);
1175       return q;
1176     }
1177 
1178     private static <T extends Comparable<T>> Queue<T> rotateLeft(Queue<T> q) {
1179       Queue<T> p = q.avlRight;
1180       q.avlRight = p.avlLeft;
1181       p.avlLeft = q;
1182       fixHeight(q);
1183       fixHeight(p);
1184       return p;
1185     }
1186 
1187     private static <T extends Comparable<T>> void fixHeight(Queue<T> node) {
1188       int heightLeft = height(node.avlLeft);
1189       int heightRight = height(node.avlRight);
1190       node.avlHeight = 1 + Math.max(heightLeft, heightRight);
1191     }
1192 
1193     private static <T extends Comparable<T>> int height(Queue<T> node) {
1194       return node != null ? node.avlHeight : 0;
1195     }
1196 
1197     private static <T extends Comparable<T>> int balanceFactor(Queue<T> node) {
1198       return height(node.avlRight) - height(node.avlLeft);
1199     }
1200   }
1201 
1202   private static class IterableList {
1203     public static <T extends Comparable<T>> Queue<T> prepend(Queue<T> head, Queue<T> node) {
1204       assert !isLinked(node) : node + " is already linked";
1205       if (head != null) {
1206         Queue<T> tail = head.iterPrev;
1207         tail.iterNext = node;
1208         head.iterPrev = node;
1209         node.iterNext = head;
1210         node.iterPrev = tail;
1211       } else {
1212         node.iterNext = node;
1213         node.iterPrev = node;
1214       }
1215       return node;
1216     }
1217 
1218     public static <T extends Comparable<T>> Queue<T> append(Queue<T> head, Queue<T> node) {
1219       assert !isLinked(node) : node + " is already linked";
1220       if (head != null) {
1221         Queue<T> tail = head.iterPrev;
1222         tail.iterNext = node;
1223         node.iterNext = head;
1224         node.iterPrev = tail;
1225         head.iterPrev = node;
1226         return head;
1227       }
1228       node.iterNext = node;
1229       node.iterPrev = node;
1230       return node;
1231     }
1232 
1233     public static <T extends Comparable<T>> Queue<T> appendList(Queue<T> head, Queue<T> otherHead) {
1234       if (head == null) return otherHead;
1235       if (otherHead == null) return head;
1236 
1237       Queue<T> tail = head.iterPrev;
1238       Queue<T> otherTail = otherHead.iterPrev;
1239       tail.iterNext = otherHead;
1240       otherHead.iterPrev = tail;
1241       otherTail.iterNext = head;
1242       head.iterPrev = otherTail;
1243       return head;
1244     }
1245 
1246     private static <T extends Comparable<T>> Queue<T> remove(Queue<T> head, Queue<T> node) {
1247       assert isLinked(node) : node + " is not linked";
1248       if (node != node.iterNext) {
1249         node.iterPrev.iterNext = node.iterNext;
1250         node.iterNext.iterPrev = node.iterPrev;
1251         head = (head == node) ? node.iterNext : head;
1252       } else {
1253         head = null;
1254       }
1255       node.iterNext = null;
1256       node.iterPrev = null;
1257       return head;
1258     }
1259 
1260     private static <T extends Comparable<T>> boolean isLinked(Queue<T> node) {
1261       return node.iterPrev != null && node.iterNext != null;
1262     }
1263   }
1264 }