001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.function.Function;
024import java.util.function.Supplier;
025import org.apache.hadoop.hbase.ServerName;
026import org.apache.hadoop.hbase.TableExistsException;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.TableNotFoundException;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
031import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
032import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
033import org.apache.hadoop.hbase.procedure2.LockAndQueue;
034import org.apache.hadoop.hbase.procedure2.LockStatus;
035import org.apache.hadoop.hbase.procedure2.LockedResource;
036import org.apache.hadoop.hbase.procedure2.LockedResourceType;
037import org.apache.hadoop.hbase.procedure2.Procedure;
038import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
039import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
040import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
041import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
047
048/**
049 * ProcedureScheduler for the Master Procedures.
050 * This ProcedureScheduler tries to provide to the ProcedureExecutor procedures
051 * that can be executed without having to wait on a lock.
052 * Most of the master operations can be executed concurrently, if they
053 * are operating on different tables (e.g. two create table procedures can be performed
054 * at the same time) or against two different servers; say two servers that crashed at
055 * about the same time.
056 *
057 * <p>Each procedure should implement an Interface providing information for this queue.
058 * For example table related procedures should implement TableProcedureInterface.
059 * Each procedure will be pushed in its own queue, and based on the operation type
060 * we may make smarter decisions: e.g. we can abort all the operations preceding
061 * a delete table, or similar.
062 *
063 * <h4>Concurrency control</h4>
064 * Concurrent access to member variables (tableRunQueue, serverRunQueue, locking, tableMap,
065 * serverBuckets) is controlled by schedLock(). This mainly includes:<br>
066 * <ul>
067 *   <li>
068 *     {@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue
069 *     when:
070 *     <ol>
071 *       <li>Queue was empty before push (so must have been out of run-queue)</li>
072 *       <li>Child procedure is added (which means parent procedure holds exclusive lock, and it
073 *           must have moved Queue out of run-queue)</li>
074 *     </ol>
075 *   </li>
076 *   <li>
077 *     {@link #poll(long)}: A poll will remove a Queue from run-queue when:
078 *     <ol>
079 *       <li>Queue becomes empty after poll</li>
080 *       <li>Exclusive lock is requested by polled procedure and lock is available (returns the
081 *           procedure)</li>
082 *       <li>Exclusive lock is requested but lock is not available (returns null)</li>
083 *       <li>Polled procedure is child of parent holding exclusive lock and the next procedure is
084 *           not a child</li>
085 *     </ol>
086 *   </li>
087 *   <li>
088 *     Namespace/table/region locks: Queue is added back to run-queue when lock being released is:
089 *     <ol>
090 *       <li>Exclusive lock</li>
091 *       <li>Last shared lock (in case queue was removed because next procedure in queue required
092 *           exclusive lock)</li>
093 *     </ol>
094 *   </li>
095 * </ul>
096 */
097@InterfaceAudience.Private
098public class MasterProcedureScheduler extends AbstractProcedureScheduler {
099  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class);
100
101  private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR =
102    (n, k) -> n.compareKey((ServerName) k);
103  private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR =
104    (n, k) -> n.compareKey((TableName) k);
105  private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR =
106    (n, k) -> n.compareKey((String) k);
107  private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
108    (n, k) -> n.compareKey((TableName) k);
109
110  private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
111  private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
112  private final FairQueue<String> peerRunQueue = new FairQueue<>();
113  private final FairQueue<TableName> metaRunQueue = new FairQueue<>();
114
115  private final ServerQueue[] serverBuckets = new ServerQueue[128];
116  private TableQueue tableMap = null;
117  private PeerQueue peerMap = null;
118  private MetaQueue metaMap = null;
119
120  private final SchemaLocking locking;
121
122  public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) {
123    locking = new SchemaLocking(procedureRetriever);
124  }
125
126  @Override
127  public void yield(final Procedure proc) {
128    push(proc, false, true);
129  }
130
131  @Override
132  protected void enqueue(final Procedure proc, final boolean addFront) {
133    if (isMetaProcedure(proc)) {
134      doAdd(metaRunQueue, getMetaQueue(), proc, addFront);
135    } else if (isTableProcedure(proc)) {
136      doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
137    } else if (isServerProcedure(proc)) {
138      ServerProcedureInterface spi = (ServerProcedureInterface) proc;
139      doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
140    } else if (isPeerProcedure(proc)) {
141      doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
142    } else {
143      // TODO: at the moment we only have Table and Server procedures
144      // if you are implementing a non-table/non-server procedure, you have two options: create
145      // a group for all the non-table/non-server procedures or try to find a key for your
146      // non-table/non-server procedures and implement something similar to the TableRunQueue.
147      throw new UnsupportedOperationException(
148        "RQs for non-table/non-server procedures are not implemented yet: " + proc);
149    }
150  }
151
152  private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,
153      Procedure<?> proc, boolean addFront) {
154    queue.add(proc, addFront);
155    // For the following conditions, we will put the queue back into execution
156    // 1. The procedure has already held the lock, or the lock has been restored when restarting,
157    // which means it can be executed immediately.
158    // 2. The exclusive lock for this queue has not been held.
159    // 3. The given procedure has the exclusive lock permission for this queue.
160    Supplier<String> reason = null;
161    if (proc.hasLock()) {
162      reason = () -> proc + " has lock";
163    } else if (proc.isLockedWhenLoading()) {
164      reason = () -> proc + " restores lock when restarting";
165    } else if (!queue.getLockStatus().hasExclusiveLock()) {
166      reason = () -> "the exclusive lock is not held by anyone when adding " + proc;
167    } else if (queue.getLockStatus().hasLockAccess(proc)) {
168      reason = () -> proc + " has the excusive lock access";
169    }
170    if (reason != null) {
171      addToRunQueue(fairq, queue, reason);
172    }
173  }
174
175  @Override
176  protected boolean queueHasRunnables() {
177    return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() ||
178      serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables();
179  }
180
181  @Override
182  protected Procedure dequeue() {
183    // meta procedure is always the first priority
184    Procedure<?> pollResult = doPoll(metaRunQueue);
185    // For now, let server handling have precedence over table handling; presumption is that it
186    // is more important handling crashed servers than it is running the
187    // enabling/disabling tables, etc.
188    if (pollResult == null) {
189      pollResult = doPoll(serverRunQueue);
190    }
191    if (pollResult == null) {
192      pollResult = doPoll(peerRunQueue);
193    }
194    if (pollResult == null) {
195      pollResult = doPoll(tableRunQueue);
196    }
197    return pollResult;
198  }
199
200  private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) {
201    LockStatus s = rq.getLockStatus();
202    // if we have the lock access, we are ready
203    if (s.hasLockAccess(proc)) {
204      return true;
205    }
206    boolean xlockReq = rq.requireExclusiveLock(proc);
207    // if we need to hold the xlock, then we need to make sure that no one holds any lock, including
208    // the shared lock, otherwise, we just need to make sure that no one holds the xlock
209    return xlockReq ? !s.isLocked() : !s.hasExclusiveLock();
210  }
211
212  private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) {
213    Queue<T> rq = fairq.poll();
214    if (rq == null || !rq.isAvailable()) {
215      return null;
216    }
217    // loop until we find out a procedure which is ready to run, or if we have checked all the
218    // procedures, then we give up and remove the queue from run queue.
219    for (int i = 0, n = rq.size(); i < n; i++) {
220      Procedure<?> proc = rq.poll();
221      if (isLockReady(proc, rq)) {
222        // the queue is empty, remove from run queue
223        if (rq.isEmpty()) {
224          removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc);
225        }
226        return proc;
227      }
228      // we are not ready to run, add back and try the next procedure
229      rq.add(proc, false);
230    }
231    // no procedure is ready for execution, remove from run queue
232    removeFromRunQueue(fairq, rq, () -> "no procedure can be executed");
233    return null;
234  }
235
236  @Override
237  public List<LockedResource> getLocks() {
238    schedLock();
239    try {
240      return locking.getLocks();
241    } finally {
242      schedUnlock();
243    }
244  }
245
246  @Override
247  public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
248    schedLock();
249    try {
250      return locking.getLockResource(resourceType, resourceName);
251    } finally {
252      schedUnlock();
253    }
254  }
255
256  @Override
257  public void clear() {
258    schedLock();
259    try {
260      clearQueue();
261      locking.clear();
262    } finally {
263      schedUnlock();
264    }
265  }
266
267  private void clearQueue() {
268    // Remove Servers
269    for (int i = 0; i < serverBuckets.length; ++i) {
270      clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
271      serverBuckets[i] = null;
272    }
273
274    // Remove Tables
275    clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
276    tableMap = null;
277
278    // Remove Peers
279    clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR);
280    peerMap = null;
281
282    assert size() == 0 : "expected queue size to be 0, got " + size();
283  }
284
285  private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap,
286      FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) {
287    while (treeMap != null) {
288      Queue<T> node = AvlTree.getFirst(treeMap);
289      treeMap = AvlTree.remove(treeMap, node.getKey(), comparator);
290      if (fairq != null) {
291        removeFromRunQueue(fairq, node, () -> "clear all queues");
292      }
293    }
294  }
295
296  private int queueSize(Queue<?> head) {
297    int count = 0;
298    AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head);
299    while (iter.hasNext()) {
300      count += iter.next().size();
301    }
302    return count;
303  }
304
305  @Override
306  protected int queueSize() {
307    int count = 0;
308    for (ServerQueue serverMap : serverBuckets) {
309      count += queueSize(serverMap);
310    }
311    count += queueSize(tableMap);
312    count += queueSize(peerMap);
313    count += queueSize(metaMap);
314    return count;
315  }
316
317  @Override
318  public void completionCleanup(final Procedure proc) {
319    if (proc instanceof TableProcedureInterface) {
320      TableProcedureInterface iProcTable = (TableProcedureInterface) proc;
321      boolean tableDeleted;
322      if (proc.hasException()) {
323        Exception procEx = proc.getException().unwrapRemoteException();
324        if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
325          // create failed because the table already exist
326          tableDeleted = !(procEx instanceof TableExistsException);
327        } else {
328          // the operation failed because the table does not exist
329          tableDeleted = (procEx instanceof TableNotFoundException);
330        }
331      } else {
332        // the table was deleted
333        tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
334      }
335      if (tableDeleted) {
336        markTableAsDeleted(iProcTable.getTableName(), proc);
337        return;
338      }
339    } else if (proc instanceof PeerProcedureInterface) {
340      tryCleanupPeerQueue(getPeerId(proc), proc);
341    } else if (proc instanceof ServerProcedureInterface) {
342      tryCleanupServerQueue(getServerName(proc), proc);
343    } else {
344      // No cleanup for other procedure types, yet.
345      return;
346    }
347  }
348
349  private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue,
350      Supplier<String> reason) {
351    if (LOG.isDebugEnabled()) {
352      LOG.debug("Add {} to run queue because: {}", queue, reason.get());
353    }
354    if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) {
355      fairq.add(queue);
356    }
357  }
358
359  private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq,
360      Queue<T> queue, Supplier<String> reason) {
361    if (LOG.isDebugEnabled()) {
362      LOG.debug("Remove {} from run queue because: {}", queue, reason.get());
363    }
364    if (AvlIterableList.isLinked(queue)) {
365      fairq.remove(queue);
366    }
367  }
368
369  // ============================================================================
370  //  Table Queue Lookup Helpers
371  // ============================================================================
372  private TableQueue getTableQueue(TableName tableName) {
373    TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
374    if (node != null) return node;
375
376    node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName),
377        locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString()));
378    tableMap = AvlTree.insert(tableMap, node);
379    return node;
380  }
381
382  private void removeTableQueue(TableName tableName) {
383    tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
384    locking.removeTableLock(tableName);
385  }
386
387  private static boolean isTableProcedure(Procedure<?> proc) {
388    return proc instanceof TableProcedureInterface;
389  }
390
391  private static TableName getTableName(Procedure<?> proc) {
392    return ((TableProcedureInterface)proc).getTableName();
393  }
394
395  // ============================================================================
396  //  Server Queue Lookup Helpers
397  // ============================================================================
398  private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) {
399    final int index = getBucketIndex(serverBuckets, serverName.hashCode());
400    ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
401    if (node != null) {
402      return node;
403    }
404    int priority;
405    if (proc != null) {
406      priority = MasterProcedureUtil.getServerPriority(proc);
407    } else {
408      priority = 1;
409    }
410    node = new ServerQueue(serverName, priority, locking.getServerLock(serverName));
411    serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
412    return node;
413  }
414
415  private void removeServerQueue(ServerName serverName) {
416    int index = getBucketIndex(serverBuckets, serverName.hashCode());
417    serverBuckets[index] =
418      AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
419    locking.removeServerLock(serverName);
420  }
421
422  private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
423    schedLock();
424    try {
425      int index = getBucketIndex(serverBuckets, serverName.hashCode());
426      ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
427      if (node == null) {
428        return;
429      }
430
431      LockAndQueue lock = locking.getServerLock(serverName);
432      if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
433        removeFromRunQueue(serverRunQueue, node,
434          () -> "clean up server queue after " + proc + " completed");
435        removeServerQueue(serverName);
436      }
437    } finally {
438      schedUnlock();
439    }
440  }
441
442  private static int getBucketIndex(Object[] buckets, int hashCode) {
443    return Math.abs(hashCode) % buckets.length;
444  }
445
446  private static boolean isServerProcedure(Procedure<?> proc) {
447    return proc instanceof ServerProcedureInterface;
448  }
449
450  private static ServerName getServerName(Procedure<?> proc) {
451    return ((ServerProcedureInterface)proc).getServerName();
452  }
453
454  // ============================================================================
455  //  Peer Queue Lookup Helpers
456  // ============================================================================
457  private PeerQueue getPeerQueue(String peerId) {
458    PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
459    if (node != null) {
460      return node;
461    }
462    node = new PeerQueue(peerId, locking.getPeerLock(peerId));
463    peerMap = AvlTree.insert(peerMap, node);
464    return node;
465  }
466
467  private void removePeerQueue(String peerId) {
468    peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
469    locking.removePeerLock(peerId);
470  }
471
472  private void tryCleanupPeerQueue(String peerId, Procedure procedure) {
473    schedLock();
474    try {
475      PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
476      if (queue == null) {
477        return;
478      }
479
480      final LockAndQueue lock = locking.getPeerLock(peerId);
481      if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
482        removeFromRunQueue(peerRunQueue, queue,
483          () -> "clean up peer queue after " + procedure + " completed");
484        removePeerQueue(peerId);
485      }
486    } finally {
487      schedUnlock();
488    }
489  }
490
491  private static boolean isPeerProcedure(Procedure<?> proc) {
492    return proc instanceof PeerProcedureInterface;
493  }
494
495  private static String getPeerId(Procedure<?> proc) {
496    return ((PeerProcedureInterface) proc).getPeerId();
497  }
498
499  // ============================================================================
500  //  Meta Queue Lookup Helpers
501  // ============================================================================
502  private MetaQueue getMetaQueue() {
503    MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR);
504    if (node != null) {
505      return node;
506    }
507    node = new MetaQueue(locking.getMetaLock());
508    metaMap = AvlTree.insert(metaMap, node);
509    return node;
510  }
511
512  private static boolean isMetaProcedure(Procedure<?> proc) {
513    return proc instanceof MetaProcedureInterface;
514  }
515  // ============================================================================
516  //  Table Locking Helpers
517  // ============================================================================
518  /**
519   * Get lock info for a resource of specified type and name and log details
520   */
521  private void logLockedResource(LockedResourceType resourceType, String resourceName) {
522    if (!LOG.isDebugEnabled()) {
523      return;
524    }
525
526    LockedResource lockedResource = getLockResource(resourceType, resourceName);
527    if (lockedResource != null) {
528      String msg = resourceType.toString() + " '" + resourceName + "', shared lock count=" +
529          lockedResource.getSharedLockCount();
530
531      Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure();
532      if (proc != null) {
533        msg += ", exclusively locked by procId=" + proc.getProcId();
534      }
535      LOG.debug(msg);
536    }
537  }
538
539  /**
540   * Suspend the procedure if the specified table is already locked.
541   * Other operations in the table-queue will be executed after the lock is released.
542   * @param procedure the procedure trying to acquire the lock
543   * @param table Table to lock
544   * @return true if the procedure has to wait for the table to be available
545   */
546  public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
547    schedLock();
548    try {
549      final String namespace = table.getNamespaceAsString();
550      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
551      final LockAndQueue tableLock = locking.getTableLock(table);
552      if (!namespaceLock.trySharedLock(procedure)) {
553        waitProcedure(namespaceLock, procedure);
554        logLockedResource(LockedResourceType.NAMESPACE, namespace);
555        return true;
556      }
557      if (!tableLock.tryExclusiveLock(procedure)) {
558        namespaceLock.releaseSharedLock();
559        waitProcedure(tableLock, procedure);
560        logLockedResource(LockedResourceType.TABLE, table.getNameAsString());
561        return true;
562      }
563      removeFromRunQueue(tableRunQueue, getTableQueue(table),
564        () -> procedure + " held the exclusive lock");
565      return false;
566    } finally {
567      schedUnlock();
568    }
569  }
570
571  /**
572   * Wake the procedures waiting for the specified table
573   * @param procedure the procedure releasing the lock
574   * @param table the name of the table that has the exclusive lock
575   */
576  public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
577    schedLock();
578    try {
579      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
580      final LockAndQueue tableLock = locking.getTableLock(table);
581      int waitingCount = 0;
582      if (tableLock.releaseExclusiveLock(procedure)) {
583        waitingCount += wakeWaitingProcedures(tableLock);
584      }
585      if (namespaceLock.releaseSharedLock()) {
586        waitingCount += wakeWaitingProcedures(namespaceLock);
587      }
588      addToRunQueue(tableRunQueue, getTableQueue(table),
589        () -> procedure + " released the exclusive lock");
590      wakePollIfNeeded(waitingCount);
591    } finally {
592      schedUnlock();
593    }
594  }
595
596  /**
597   * Suspend the procedure if the specified table is already locked.
598   * other "read" operations in the table-queue may be executed concurrently,
599   * @param procedure the procedure trying to acquire the lock
600   * @param table Table to lock
601   * @return true if the procedure has to wait for the table to be available
602   */
603  public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) {
604    return waitTableQueueSharedLock(procedure, table) == null;
605  }
606
607  private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) {
608    schedLock();
609    try {
610      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
611      final LockAndQueue tableLock = locking.getTableLock(table);
612      if (!namespaceLock.trySharedLock(procedure)) {
613        waitProcedure(namespaceLock, procedure);
614        return null;
615      }
616
617      if (!tableLock.trySharedLock(procedure)) {
618        namespaceLock.releaseSharedLock();
619        waitProcedure(tableLock, procedure);
620        return null;
621      }
622
623      return getTableQueue(table);
624    } finally {
625      schedUnlock();
626    }
627  }
628
629  /**
630   * Wake the procedures waiting for the specified table
631   * @param procedure the procedure releasing the lock
632   * @param table the name of the table that has the shared lock
633   */
634  public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) {
635    schedLock();
636    try {
637      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
638      final LockAndQueue tableLock = locking.getTableLock(table);
639      int waitingCount = 0;
640      if (tableLock.releaseSharedLock()) {
641        addToRunQueue(tableRunQueue, getTableQueue(table),
642          () -> procedure + " released the shared lock");
643        waitingCount += wakeWaitingProcedures(tableLock);
644      }
645      if (namespaceLock.releaseSharedLock()) {
646        waitingCount += wakeWaitingProcedures(namespaceLock);
647      }
648      wakePollIfNeeded(waitingCount);
649    } finally {
650      schedUnlock();
651    }
652  }
653
654  /**
655   * Tries to remove the queue and the table-lock of the specified table.
656   * If there are new operations pending (e.g. a new create),
657   * the remove will not be performed.
658   * @param table the name of the table that should be marked as deleted
659   * @param procedure the procedure that is removing the table
660   * @return true if deletion succeeded, false otherwise meaning that there are
661   *     other new operations pending for that table (e.g. a new create).
662   */
663  @VisibleForTesting
664  boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) {
665    schedLock();
666    try {
667      final TableQueue queue = getTableQueue(table);
668      final LockAndQueue tableLock = locking.getTableLock(table);
669      if (queue == null) return true;
670
671      if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) {
672        // remove the table from the run-queue and the map
673        if (AvlIterableList.isLinked(queue)) {
674          tableRunQueue.remove(queue);
675        }
676        removeTableQueue(table);
677      } else {
678        // TODO: If there are no create, we can drop all the other ops
679        return false;
680      }
681    } finally {
682      schedUnlock();
683    }
684    return true;
685  }
686
687  // ============================================================================
688  //  Region Locking Helpers
689  // ============================================================================
690  /**
691   * Suspend the procedure if the specified region is already locked.
692   * @param procedure the procedure trying to acquire the lock on the region
693   * @param regionInfo the region we are trying to lock
694   * @return true if the procedure has to wait for the regions to be available
695   */
696  public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
697    return waitRegions(procedure, regionInfo.getTable(), regionInfo);
698  }
699
700  /**
701   * Suspend the procedure if the specified set of regions are already locked.
702   * @param procedure the procedure trying to acquire the lock on the regions
703   * @param table the table name of the regions we are trying to lock
704   * @param regionInfo the list of regions we are trying to lock
705   * @return true if the procedure has to wait for the regions to be available
706   */
707  public boolean waitRegions(final Procedure<?> procedure, final TableName table,
708      final RegionInfo... regionInfo) {
709    Arrays.sort(regionInfo, RegionInfo.COMPARATOR);
710    schedLock();
711    try {
712      assert table != null;
713      if (waitTableSharedLock(procedure, table)) {
714        return true;
715      }
716
717      // acquire region xlocks or wait
718      boolean hasLock = true;
719      final LockAndQueue[] regionLocks = new LockAndQueue[regionInfo.length];
720      for (int i = 0; i < regionInfo.length; ++i) {
721        assert regionInfo[i] != null;
722        assert regionInfo[i].getTable() != null;
723        assert regionInfo[i].getTable().equals(table): regionInfo[i] + " " + procedure;
724        assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i];
725
726        regionLocks[i] = locking.getRegionLock(regionInfo[i].getEncodedName());
727        if (!regionLocks[i].tryExclusiveLock(procedure)) {
728          LOG.info("Waiting on xlock for {} held by pid={}", procedure,
729              regionLocks[i].getExclusiveLockProcIdOwner());
730          waitProcedure(regionLocks[i], procedure);
731          hasLock = false;
732          while (i-- > 0) {
733            regionLocks[i].releaseExclusiveLock(procedure);
734          }
735          break;
736        } else {
737          LOG.info("Took xlock for {}", procedure);
738        }
739      }
740
741      if (!hasLock) {
742        wakeTableSharedLock(procedure, table);
743      }
744      return !hasLock;
745    } finally {
746      schedUnlock();
747    }
748  }
749
750  /**
751   * Wake the procedures waiting for the specified region
752   * @param procedure the procedure that was holding the region
753   * @param regionInfo the region the procedure was holding
754   */
755  public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
756    wakeRegions(procedure, regionInfo.getTable(), regionInfo);
757  }
758
759  /**
760   * Wake the procedures waiting for the specified regions
761   * @param procedure the procedure that was holding the regions
762   * @param regionInfo the list of regions the procedure was holding
763   */
764  public void wakeRegions(final Procedure<?> procedure,final TableName table,
765      final RegionInfo... regionInfo) {
766    Arrays.sort(regionInfo, RegionInfo.COMPARATOR);
767    schedLock();
768    try {
769      int numProcs = 0;
770      final Procedure<?>[] nextProcs = new Procedure[regionInfo.length];
771      for (int i = 0; i < regionInfo.length; ++i) {
772        assert regionInfo[i].getTable().equals(table);
773        assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i];
774
775        LockAndQueue regionLock = locking.getRegionLock(regionInfo[i].getEncodedName());
776        if (regionLock.releaseExclusiveLock(procedure)) {
777          if (!regionLock.isWaitingQueueEmpty()) {
778            // release one procedure at the time since regions has an xlock
779            nextProcs[numProcs++] = regionLock.removeFirst();
780          } else {
781            locking.removeRegionLock(regionInfo[i].getEncodedName());
782          }
783        }
784      }
785
786      // awake procedures if any
787      for (int i = numProcs - 1; i >= 0; --i) {
788        wakeProcedure(nextProcs[i]);
789      }
790      wakePollIfNeeded(numProcs);
791      // release the table shared-lock.
792      wakeTableSharedLock(procedure, table);
793    } finally {
794      schedUnlock();
795    }
796  }
797
798  // ============================================================================
799  //  Namespace Locking Helpers
800  // ============================================================================
801  /**
802   * Suspend the procedure if the specified namespace is already locked.
803   * @see #wakeNamespaceExclusiveLock(Procedure,String)
804   * @param procedure the procedure trying to acquire the lock
805   * @param namespace Namespace to lock
806   * @return true if the procedure has to wait for the namespace to be available
807   */
808  public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) {
809    schedLock();
810    try {
811      final LockAndQueue systemNamespaceTableLock =
812        locking.getTableLock(TableName.NAMESPACE_TABLE_NAME);
813      if (!systemNamespaceTableLock.trySharedLock(procedure)) {
814        waitProcedure(systemNamespaceTableLock, procedure);
815        logLockedResource(LockedResourceType.TABLE,
816          TableName.NAMESPACE_TABLE_NAME.getNameAsString());
817        return true;
818      }
819
820      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
821      if (!namespaceLock.tryExclusiveLock(procedure)) {
822        systemNamespaceTableLock.releaseSharedLock();
823        waitProcedure(namespaceLock, procedure);
824        logLockedResource(LockedResourceType.NAMESPACE, namespace);
825        return true;
826      }
827      return false;
828    } finally {
829      schedUnlock();
830    }
831  }
832
833  /**
834   * Wake the procedures waiting for the specified namespace
835   * @see #waitNamespaceExclusiveLock(Procedure,String)
836   * @param procedure the procedure releasing the lock
837   * @param namespace the namespace that has the exclusive lock
838   */
839  public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) {
840    schedLock();
841    try {
842      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
843      final LockAndQueue systemNamespaceTableLock =
844          locking.getTableLock(TableName.NAMESPACE_TABLE_NAME);
845      int waitingCount = 0;
846      if (namespaceLock.releaseExclusiveLock(procedure)) {
847        waitingCount += wakeWaitingProcedures(namespaceLock);
848      }
849      if (systemNamespaceTableLock.releaseSharedLock()) {
850        addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME),
851          () -> procedure + " released namespace exclusive lock");
852        waitingCount += wakeWaitingProcedures(systemNamespaceTableLock);
853      }
854      wakePollIfNeeded(waitingCount);
855    } finally {
856      schedUnlock();
857    }
858  }
859
860  // ============================================================================
861  //  Server Locking Helpers
862  // ============================================================================
863  /**
864   * Try to acquire the exclusive lock on the specified server.
865   * @see #wakeServerExclusiveLock(Procedure,ServerName)
866   * @param procedure the procedure trying to acquire the lock
867   * @param serverName Server to lock
868   * @return true if the procedure has to wait for the server to be available
869   */
870  public boolean waitServerExclusiveLock(final Procedure<?> procedure,
871      final ServerName serverName) {
872    schedLock();
873    try {
874      final LockAndQueue lock = locking.getServerLock(serverName);
875      if (lock.tryExclusiveLock(procedure)) {
876        // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
877        // so.
878        removeFromRunQueue(serverRunQueue,
879          getServerQueue(serverName,
880            procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
881              : null),
882          () -> procedure + " held exclusive lock");
883        return false;
884      }
885      waitProcedure(lock, procedure);
886      logLockedResource(LockedResourceType.SERVER, serverName.getServerName());
887      return true;
888    } finally {
889      schedUnlock();
890    }
891  }
892
893  /**
894   * Wake the procedures waiting for the specified server
895   * @see #waitServerExclusiveLock(Procedure,ServerName)
896   * @param procedure the procedure releasing the lock
897   * @param serverName the server that has the exclusive lock
898   */
899  public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) {
900    schedLock();
901    try {
902      final LockAndQueue lock = locking.getServerLock(serverName);
903      // Only SCP will acquire/release server lock so do not need to check the return value here.
904      lock.releaseExclusiveLock(procedure);
905      // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
906      // so.
907      addToRunQueue(serverRunQueue,
908        getServerQueue(serverName,
909          procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
910            : null), () -> procedure + " released exclusive lock");
911      int waitingCount = wakeWaitingProcedures(lock);
912      wakePollIfNeeded(waitingCount);
913    } finally {
914      schedUnlock();
915    }
916  }
917
918  // ============================================================================
919  //  Peer Locking Helpers
920  // ============================================================================
921  private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) {
922    return proc.getPeerOperationType() != PeerOperationType.REFRESH;
923  }
924
925  /**
926   * Try to acquire the exclusive lock on the specified peer.
927   * @see #wakePeerExclusiveLock(Procedure, String)
928   * @param procedure the procedure trying to acquire the lock
929   * @param peerId peer to lock
930   * @return true if the procedure has to wait for the peer to be available
931   */
932  public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) {
933    schedLock();
934    try {
935      final LockAndQueue lock = locking.getPeerLock(peerId);
936      if (lock.tryExclusiveLock(procedure)) {
937        removeFromRunQueue(peerRunQueue, getPeerQueue(peerId),
938          () -> procedure + " held exclusive lock");
939        return false;
940      }
941      waitProcedure(lock, procedure);
942      logLockedResource(LockedResourceType.PEER, peerId);
943      return true;
944    } finally {
945      schedUnlock();
946    }
947  }
948
949  /**
950   * Wake the procedures waiting for the specified peer
951   * @see #waitPeerExclusiveLock(Procedure, String)
952   * @param procedure the procedure releasing the lock
953   * @param peerId the peer that has the exclusive lock
954   */
955  public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) {
956    schedLock();
957    try {
958      final LockAndQueue lock = locking.getPeerLock(peerId);
959      if (lock.releaseExclusiveLock(procedure)) {
960        addToRunQueue(peerRunQueue, getPeerQueue(peerId),
961          () -> procedure + " released exclusive lock");
962        int waitingCount = wakeWaitingProcedures(lock);
963        wakePollIfNeeded(waitingCount);
964      }
965    } finally {
966      schedUnlock();
967    }
968  }
969
970  // ============================================================================
971  // Meta Locking Helpers
972  // ============================================================================
973  /**
974   * Try to acquire the exclusive lock on meta.
975   * @see #wakeMetaExclusiveLock(Procedure)
976   * @param procedure the procedure trying to acquire the lock
977   * @return true if the procedure has to wait for meta to be available
978   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
979   *             {@link RecoverMetaProcedure}.
980   */
981  @Deprecated
982  public boolean waitMetaExclusiveLock(Procedure<?> procedure) {
983    schedLock();
984    try {
985      final LockAndQueue lock = locking.getMetaLock();
986      if (lock.tryExclusiveLock(procedure)) {
987        removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock");
988        return false;
989      }
990      waitProcedure(lock, procedure);
991      logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString());
992      return true;
993    } finally {
994      schedUnlock();
995    }
996  }
997
998  /**
999   * Wake the procedures waiting for meta.
1000   * @see #waitMetaExclusiveLock(Procedure)
1001   * @param procedure the procedure releasing the lock
1002   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
1003   *             {@link RecoverMetaProcedure}.
1004   */
1005  @Deprecated
1006  public void wakeMetaExclusiveLock(Procedure<?> procedure) {
1007    schedLock();
1008    try {
1009      final LockAndQueue lock = locking.getMetaLock();
1010      lock.releaseExclusiveLock(procedure);
1011      addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock");
1012      int waitingCount = wakeWaitingProcedures(lock);
1013      wakePollIfNeeded(waitingCount);
1014    } finally {
1015      schedUnlock();
1016    }
1017  }
1018
1019  /**
1020   * For debugging. Expensive.
1021   */
1022  @VisibleForTesting
1023  public String dumpLocks() throws IOException {
1024    schedLock();
1025    try {
1026      // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter
1027      return this.locking.toString();
1028    } finally {
1029      schedUnlock();
1030    }
1031  }
1032}