001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.function.Function;
024import java.util.function.Supplier;
025import org.apache.hadoop.hbase.ServerName;
026import org.apache.hadoop.hbase.TableExistsException;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.TableNotFoundException;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
031import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
032import org.apache.hadoop.hbase.procedure2.LockAndQueue;
033import org.apache.hadoop.hbase.procedure2.LockStatus;
034import org.apache.hadoop.hbase.procedure2.LockedResource;
035import org.apache.hadoop.hbase.procedure2.LockedResourceType;
036import org.apache.hadoop.hbase.procedure2.Procedure;
037import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
038import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
039import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
040import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
046
047/**
048 * ProcedureScheduler for the Master Procedures.
049 * This ProcedureScheduler tries to provide to the ProcedureExecutor procedures
050 * that can be executed without having to wait on a lock.
051 * Most of the master operations can be executed concurrently, if they
052 * are operating on different tables (e.g. two create table procedures can be performed
053 * at the same time) or against two different servers; say two servers that crashed at
054 * about the same time.
055 *
056 * <p>Each procedure should implement an Interface providing information for this queue.
057 * For example table related procedures should implement TableProcedureInterface.
058 * Each procedure will be pushed in its own queue, and based on the operation type
059 * we may make smarter decisions: e.g. we can abort all the operations preceding
060 * a delete table, or similar.
061 *
062 * <h4>Concurrency control</h4>
063 * Concurrent access to member variables (tableRunQueue, serverRunQueue, locking, tableMap,
064 * serverBuckets) is controlled by schedLock(). This mainly includes:<br>
065 * <ul>
066 *   <li>
067 *     {@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue
068 *     when:
069 *     <ol>
070 *       <li>Queue was empty before push (so must have been out of run-queue)</li>
071 *       <li>Child procedure is added (which means parent procedure holds exclusive lock, and it
072 *           must have moved Queue out of run-queue)</li>
073 *     </ol>
074 *   </li>
075 *   <li>
076 *     {@link #poll(long)}: A poll will remove a Queue from run-queue when:
077 *     <ol>
078 *       <li>Queue becomes empty after poll</li>
079 *       <li>Exclusive lock is requested by polled procedure and lock is available (returns the
080 *           procedure)</li>
081 *       <li>Exclusive lock is requested but lock is not available (returns null)</li>
082 *       <li>Polled procedure is child of parent holding exclusive lock and the next procedure is
083 *           not a child</li>
084 *     </ol>
085 *   </li>
086 *   <li>
087 *     Namespace/table/region locks: Queue is added back to run-queue when lock being released is:
088 *     <ol>
089 *       <li>Exclusive lock</li>
090 *       <li>Last shared lock (in case queue was removed because next procedure in queue required
091 *           exclusive lock)</li>
092 *     </ol>
093 *   </li>
094 * </ul>
095 */
096@InterfaceAudience.Private
097public class MasterProcedureScheduler extends AbstractProcedureScheduler {
098  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class);
099
100  private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR =
101    (n, k) -> n.compareKey((ServerName) k);
102  private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR =
103    (n, k) -> n.compareKey((TableName) k);
104  private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR =
105    (n, k) -> n.compareKey((String) k);
106  private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
107    (n, k) -> n.compareKey((TableName) k);
108
109  private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
110  private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
111  private final FairQueue<String> peerRunQueue = new FairQueue<>();
112  private final FairQueue<TableName> metaRunQueue = new FairQueue<>();
113
114  private final ServerQueue[] serverBuckets = new ServerQueue[128];
115  private TableQueue tableMap = null;
116  private PeerQueue peerMap = null;
117  private MetaQueue metaMap = null;
118
119  private final SchemaLocking locking;
120
121  public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) {
122    locking = new SchemaLocking(procedureRetriever);
123  }
124
125  @Override
126  public void yield(final Procedure proc) {
127    push(proc, false, true);
128  }
129
130  @Override
131  protected void enqueue(final Procedure proc, final boolean addFront) {
132    if (isMetaProcedure(proc)) {
133      doAdd(metaRunQueue, getMetaQueue(), proc, addFront);
134    } else if (isTableProcedure(proc)) {
135      doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
136    } else if (isServerProcedure(proc)) {
137      ServerProcedureInterface spi = (ServerProcedureInterface) proc;
138      doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
139    } else if (isPeerProcedure(proc)) {
140      doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
141    } else {
142      // TODO: at the moment we only have Table and Server procedures
143      // if you are implementing a non-table/non-server procedure, you have two options: create
144      // a group for all the non-table/non-server procedures or try to find a key for your
145      // non-table/non-server procedures and implement something similar to the TableRunQueue.
146      throw new UnsupportedOperationException(
147        "RQs for non-table/non-server procedures are not implemented yet: " + proc);
148    }
149  }
150
151  private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,
152      Procedure<?> proc, boolean addFront) {
153    queue.add(proc, addFront);
154    // For the following conditions, we will put the queue back into execution
155    // 1. The procedure has already held the lock, or the lock has been restored when restarting,
156    // which means it can be executed immediately.
157    // 2. The exclusive lock for this queue has not been held.
158    // 3. The given procedure has the exclusive lock permission for this queue.
159    Supplier<String> reason = null;
160    if (proc.hasLock()) {
161      reason = () -> proc + " has lock";
162    } else if (proc.isLockedWhenLoading()) {
163      reason = () -> proc + " restores lock when restarting";
164    } else if (!queue.getLockStatus().hasExclusiveLock()) {
165      reason = () -> "the exclusive lock is not held by anyone when adding " + proc;
166    } else if (queue.getLockStatus().hasLockAccess(proc)) {
167      reason = () -> proc + " has the excusive lock access";
168    }
169    if (reason != null) {
170      addToRunQueue(fairq, queue, reason);
171    }
172  }
173
174  @Override
175  protected boolean queueHasRunnables() {
176    return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() ||
177      serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables();
178  }
179
180  @Override
181  protected Procedure dequeue() {
182    // meta procedure is always the first priority
183    Procedure<?> pollResult = doPoll(metaRunQueue);
184    // For now, let server handling have precedence over table handling; presumption is that it
185    // is more important handling crashed servers than it is running the
186    // enabling/disabling tables, etc.
187    if (pollResult == null) {
188      pollResult = doPoll(serverRunQueue);
189    }
190    if (pollResult == null) {
191      pollResult = doPoll(peerRunQueue);
192    }
193    if (pollResult == null) {
194      pollResult = doPoll(tableRunQueue);
195    }
196    return pollResult;
197  }
198
199  private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) {
200    LockStatus s = rq.getLockStatus();
201    // if we have the lock access, we are ready
202    if (s.hasLockAccess(proc)) {
203      return true;
204    }
205    boolean xlockReq = rq.requireExclusiveLock(proc);
206    // if we need to hold the xlock, then we need to make sure that no one holds any lock, including
207    // the shared lock, otherwise, we just need to make sure that no one holds the xlock
208    return xlockReq ? !s.isLocked() : !s.hasExclusiveLock();
209  }
210
211  private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) {
212    Queue<T> rq = fairq.poll();
213    if (rq == null || !rq.isAvailable()) {
214      return null;
215    }
216    // loop until we find out a procedure which is ready to run, or if we have checked all the
217    // procedures, then we give up and remove the queue from run queue.
218    for (int i = 0, n = rq.size(); i < n; i++) {
219      Procedure<?> proc = rq.poll();
220      if (isLockReady(proc, rq)) {
221        // the queue is empty, remove from run queue
222        if (rq.isEmpty()) {
223          removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc);
224        }
225        return proc;
226      }
227      // we are not ready to run, add back and try the next procedure
228      rq.add(proc, false);
229    }
230    // no procedure is ready for execution, remove from run queue
231    removeFromRunQueue(fairq, rq, () -> "no procedure can be executed");
232    return null;
233  }
234
235  @Override
236  public List<LockedResource> getLocks() {
237    schedLock();
238    try {
239      return locking.getLocks();
240    } finally {
241      schedUnlock();
242    }
243  }
244
245  @Override
246  public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
247    schedLock();
248    try {
249      return locking.getLockResource(resourceType, resourceName);
250    } finally {
251      schedUnlock();
252    }
253  }
254
255  @Override
256  public void clear() {
257    schedLock();
258    try {
259      clearQueue();
260      locking.clear();
261    } finally {
262      schedUnlock();
263    }
264  }
265
266  private void clearQueue() {
267    // Remove Servers
268    for (int i = 0; i < serverBuckets.length; ++i) {
269      clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
270      serverBuckets[i] = null;
271    }
272
273    // Remove Tables
274    clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
275    tableMap = null;
276
277    // Remove Peers
278    clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR);
279    peerMap = null;
280
281    assert size() == 0 : "expected queue size to be 0, got " + size();
282  }
283
284  private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap,
285      FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) {
286    while (treeMap != null) {
287      Queue<T> node = AvlTree.getFirst(treeMap);
288      treeMap = AvlTree.remove(treeMap, node.getKey(), comparator);
289      if (fairq != null) {
290        removeFromRunQueue(fairq, node, () -> "clear all queues");
291      }
292    }
293  }
294
295  private int queueSize(Queue<?> head) {
296    int count = 0;
297    AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head);
298    while (iter.hasNext()) {
299      count += iter.next().size();
300    }
301    return count;
302  }
303
304  @Override
305  protected int queueSize() {
306    int count = 0;
307    for (ServerQueue serverMap : serverBuckets) {
308      count += queueSize(serverMap);
309    }
310    count += queueSize(tableMap);
311    count += queueSize(peerMap);
312    count += queueSize(metaMap);
313    return count;
314  }
315
316  @Override
317  public void completionCleanup(final Procedure proc) {
318    if (proc instanceof TableProcedureInterface) {
319      TableProcedureInterface iProcTable = (TableProcedureInterface) proc;
320      boolean tableDeleted;
321      if (proc.hasException()) {
322        Exception procEx = proc.getException().unwrapRemoteException();
323        if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
324          // create failed because the table already exist
325          tableDeleted = !(procEx instanceof TableExistsException);
326        } else {
327          // the operation failed because the table does not exist
328          tableDeleted = (procEx instanceof TableNotFoundException);
329        }
330      } else {
331        // the table was deleted
332        tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
333      }
334      if (tableDeleted) {
335        markTableAsDeleted(iProcTable.getTableName(), proc);
336        return;
337      }
338    } else if (proc instanceof PeerProcedureInterface) {
339      tryCleanupPeerQueue(getPeerId(proc), proc);
340    } else if (proc instanceof ServerProcedureInterface) {
341      tryCleanupServerQueue(getServerName(proc), proc);
342    } else {
343      // No cleanup for other procedure types, yet.
344      return;
345    }
346  }
347
348  private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue,
349      Supplier<String> reason) {
350    if (LOG.isTraceEnabled()) {
351      LOG.trace("Add {} to run queue because: {}", queue, reason.get());
352    }
353    if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) {
354      fairq.add(queue);
355    }
356  }
357
358  private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq,
359      Queue<T> queue, Supplier<String> reason) {
360    if (LOG.isTraceEnabled()) {
361      LOG.trace("Remove {} from run queue because: {}", queue, reason.get());
362    }
363    if (AvlIterableList.isLinked(queue)) {
364      fairq.remove(queue);
365    }
366  }
367
368  // ============================================================================
369  //  Table Queue Lookup Helpers
370  // ============================================================================
371  private TableQueue getTableQueue(TableName tableName) {
372    TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
373    if (node != null) return node;
374
375    node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName),
376        locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString()));
377    tableMap = AvlTree.insert(tableMap, node);
378    return node;
379  }
380
381  private void removeTableQueue(TableName tableName) {
382    tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
383    locking.removeTableLock(tableName);
384  }
385
386  private static boolean isTableProcedure(Procedure<?> proc) {
387    return proc instanceof TableProcedureInterface;
388  }
389
390  private static TableName getTableName(Procedure<?> proc) {
391    return ((TableProcedureInterface)proc).getTableName();
392  }
393
394  // ============================================================================
395  //  Server Queue Lookup Helpers
396  // ============================================================================
397  private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) {
398    final int index = getBucketIndex(serverBuckets, serverName.hashCode());
399    ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
400    if (node != null) {
401      return node;
402    }
403    int priority;
404    if (proc != null) {
405      priority = MasterProcedureUtil.getServerPriority(proc);
406    } else {
407      priority = 1;
408    }
409    node = new ServerQueue(serverName, priority, locking.getServerLock(serverName));
410    serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
411    return node;
412  }
413
414  private void removeServerQueue(ServerName serverName) {
415    int index = getBucketIndex(serverBuckets, serverName.hashCode());
416    serverBuckets[index] =
417      AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
418    locking.removeServerLock(serverName);
419  }
420
421  private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
422    schedLock();
423    try {
424      int index = getBucketIndex(serverBuckets, serverName.hashCode());
425      ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
426      if (node == null) {
427        return;
428      }
429
430      LockAndQueue lock = locking.getServerLock(serverName);
431      if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
432        removeFromRunQueue(serverRunQueue, node,
433          () -> "clean up server queue after " + proc + " completed");
434        removeServerQueue(serverName);
435      }
436    } finally {
437      schedUnlock();
438    }
439  }
440
441  private static int getBucketIndex(Object[] buckets, int hashCode) {
442    return Math.abs(hashCode) % buckets.length;
443  }
444
445  private static boolean isServerProcedure(Procedure<?> proc) {
446    return proc instanceof ServerProcedureInterface;
447  }
448
449  private static ServerName getServerName(Procedure<?> proc) {
450    return ((ServerProcedureInterface)proc).getServerName();
451  }
452
453  // ============================================================================
454  //  Peer Queue Lookup Helpers
455  // ============================================================================
456  private PeerQueue getPeerQueue(String peerId) {
457    PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
458    if (node != null) {
459      return node;
460    }
461    node = new PeerQueue(peerId, locking.getPeerLock(peerId));
462    peerMap = AvlTree.insert(peerMap, node);
463    return node;
464  }
465
466  private void removePeerQueue(String peerId) {
467    peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
468    locking.removePeerLock(peerId);
469  }
470
471  private void tryCleanupPeerQueue(String peerId, Procedure<?> procedure) {
472    schedLock();
473    try {
474      PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
475      if (queue == null) {
476        return;
477      }
478
479      final LockAndQueue lock = locking.getPeerLock(peerId);
480      if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
481        removeFromRunQueue(peerRunQueue, queue,
482          () -> "clean up peer queue after " + procedure + " completed");
483        removePeerQueue(peerId);
484      }
485    } finally {
486      schedUnlock();
487    }
488  }
489
490  private static boolean isPeerProcedure(Procedure<?> proc) {
491    return proc instanceof PeerProcedureInterface;
492  }
493
494  private static String getPeerId(Procedure<?> proc) {
495    return ((PeerProcedureInterface) proc).getPeerId();
496  }
497
498  // ============================================================================
499  //  Meta Queue Lookup Helpers
500  // ============================================================================
501  private MetaQueue getMetaQueue() {
502    MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR);
503    if (node != null) {
504      return node;
505    }
506    node = new MetaQueue(locking.getMetaLock());
507    metaMap = AvlTree.insert(metaMap, node);
508    return node;
509  }
510
511  private static boolean isMetaProcedure(Procedure<?> proc) {
512    return proc instanceof MetaProcedureInterface;
513  }
514  // ============================================================================
515  //  Table Locking Helpers
516  // ============================================================================
517  /**
518   * Get lock info for a resource of specified type and name and log details
519   */
520  private void logLockedResource(LockedResourceType resourceType, String resourceName) {
521    if (!LOG.isDebugEnabled()) {
522      return;
523    }
524
525    LockedResource lockedResource = getLockResource(resourceType, resourceName);
526    if (lockedResource != null) {
527      String msg = resourceType.toString() + " '" + resourceName + "', shared lock count=" +
528          lockedResource.getSharedLockCount();
529
530      Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure();
531      if (proc != null) {
532        msg += ", exclusively locked by procId=" + proc.getProcId();
533      }
534      LOG.debug(msg);
535    }
536  }
537
538  /**
539   * Suspend the procedure if the specified table is already locked.
540   * Other operations in the table-queue will be executed after the lock is released.
541   * @param procedure the procedure trying to acquire the lock
542   * @param table Table to lock
543   * @return true if the procedure has to wait for the table to be available
544   */
545  public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
546    schedLock();
547    try {
548      final String namespace = table.getNamespaceAsString();
549      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
550      final LockAndQueue tableLock = locking.getTableLock(table);
551      if (!namespaceLock.trySharedLock(procedure)) {
552        waitProcedure(namespaceLock, procedure);
553        logLockedResource(LockedResourceType.NAMESPACE, namespace);
554        return true;
555      }
556      if (!tableLock.tryExclusiveLock(procedure)) {
557        namespaceLock.releaseSharedLock();
558        waitProcedure(tableLock, procedure);
559        logLockedResource(LockedResourceType.TABLE, table.getNameAsString());
560        return true;
561      }
562      removeFromRunQueue(tableRunQueue, getTableQueue(table),
563        () -> procedure + " held the exclusive lock");
564      return false;
565    } finally {
566      schedUnlock();
567    }
568  }
569
570  /**
571   * Wake the procedures waiting for the specified table
572   * @param procedure the procedure releasing the lock
573   * @param table the name of the table that has the exclusive lock
574   */
575  public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
576    schedLock();
577    try {
578      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
579      final LockAndQueue tableLock = locking.getTableLock(table);
580      int waitingCount = 0;
581      if (tableLock.releaseExclusiveLock(procedure)) {
582        waitingCount += wakeWaitingProcedures(tableLock);
583      }
584      if (namespaceLock.releaseSharedLock()) {
585        waitingCount += wakeWaitingProcedures(namespaceLock);
586      }
587      addToRunQueue(tableRunQueue, getTableQueue(table),
588        () -> procedure + " released the exclusive lock");
589      wakePollIfNeeded(waitingCount);
590    } finally {
591      schedUnlock();
592    }
593  }
594
595  /**
596   * Suspend the procedure if the specified table is already locked.
597   * other "read" operations in the table-queue may be executed concurrently,
598   * @param procedure the procedure trying to acquire the lock
599   * @param table Table to lock
600   * @return true if the procedure has to wait for the table to be available
601   */
602  public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) {
603    return waitTableQueueSharedLock(procedure, table) == null;
604  }
605
606  private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) {
607    schedLock();
608    try {
609      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
610      final LockAndQueue tableLock = locking.getTableLock(table);
611      if (!namespaceLock.trySharedLock(procedure)) {
612        waitProcedure(namespaceLock, procedure);
613        return null;
614      }
615
616      if (!tableLock.trySharedLock(procedure)) {
617        namespaceLock.releaseSharedLock();
618        waitProcedure(tableLock, procedure);
619        return null;
620      }
621
622      return getTableQueue(table);
623    } finally {
624      schedUnlock();
625    }
626  }
627
628  /**
629   * Wake the procedures waiting for the specified table
630   * @param procedure the procedure releasing the lock
631   * @param table the name of the table that has the shared lock
632   */
633  public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) {
634    schedLock();
635    try {
636      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
637      final LockAndQueue tableLock = locking.getTableLock(table);
638      int waitingCount = 0;
639      if (tableLock.releaseSharedLock()) {
640        addToRunQueue(tableRunQueue, getTableQueue(table),
641          () -> procedure + " released the shared lock");
642        waitingCount += wakeWaitingProcedures(tableLock);
643      }
644      if (namespaceLock.releaseSharedLock()) {
645        waitingCount += wakeWaitingProcedures(namespaceLock);
646      }
647      wakePollIfNeeded(waitingCount);
648    } finally {
649      schedUnlock();
650    }
651  }
652
653  /**
654   * Tries to remove the queue and the table-lock of the specified table.
655   * If there are new operations pending (e.g. a new create),
656   * the remove will not be performed.
657   * @param table the name of the table that should be marked as deleted
658   * @param procedure the procedure that is removing the table
659   * @return true if deletion succeeded, false otherwise meaning that there are
660   *     other new operations pending for that table (e.g. a new create).
661   */
662  @VisibleForTesting
663  boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) {
664    schedLock();
665    try {
666      final TableQueue queue = getTableQueue(table);
667      final LockAndQueue tableLock = locking.getTableLock(table);
668      if (queue == null) return true;
669
670      if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) {
671        // remove the table from the run-queue and the map
672        if (AvlIterableList.isLinked(queue)) {
673          tableRunQueue.remove(queue);
674        }
675        removeTableQueue(table);
676      } else {
677        // TODO: If there are no create, we can drop all the other ops
678        return false;
679      }
680    } finally {
681      schedUnlock();
682    }
683    return true;
684  }
685
686  // ============================================================================
687  //  Region Locking Helpers
688  // ============================================================================
689  /**
690   * Suspend the procedure if the specified region is already locked.
691   * @param procedure the procedure trying to acquire the lock on the region
692   * @param regionInfo the region we are trying to lock
693   * @return true if the procedure has to wait for the regions to be available
694   */
695  public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
696    return waitRegions(procedure, regionInfo.getTable(), regionInfo);
697  }
698
699  /**
700   * Suspend the procedure if the specified set of regions are already locked.
701   * @param procedure the procedure trying to acquire the lock on the regions
702   * @param table the table name of the regions we are trying to lock
703   * @param regionInfo the list of regions we are trying to lock
704   * @return true if the procedure has to wait for the regions to be available
705   */
706  public boolean waitRegions(final Procedure<?> procedure, final TableName table,
707      final RegionInfo... regionInfo) {
708    Arrays.sort(regionInfo, RegionInfo.COMPARATOR);
709    schedLock();
710    try {
711      assert table != null;
712      if (waitTableSharedLock(procedure, table)) {
713        return true;
714      }
715
716      // acquire region xlocks or wait
717      boolean hasLock = true;
718      final LockAndQueue[] regionLocks = new LockAndQueue[regionInfo.length];
719      for (int i = 0; i < regionInfo.length; ++i) {
720        assert regionInfo[i] != null;
721        assert regionInfo[i].getTable() != null;
722        assert regionInfo[i].getTable().equals(table): regionInfo[i] + " " + procedure;
723        assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i];
724
725        regionLocks[i] = locking.getRegionLock(regionInfo[i].getEncodedName());
726        if (!regionLocks[i].tryExclusiveLock(procedure)) {
727          LOG.info("Waiting on xlock for {} held by pid={}", procedure,
728              regionLocks[i].getExclusiveLockProcIdOwner());
729          waitProcedure(regionLocks[i], procedure);
730          hasLock = false;
731          while (i-- > 0) {
732            regionLocks[i].releaseExclusiveLock(procedure);
733          }
734          break;
735        } else {
736          LOG.info("Took xlock for {}", procedure);
737        }
738      }
739
740      if (!hasLock) {
741        wakeTableSharedLock(procedure, table);
742      }
743      return !hasLock;
744    } finally {
745      schedUnlock();
746    }
747  }
748
749  /**
750   * Wake the procedures waiting for the specified region
751   * @param procedure the procedure that was holding the region
752   * @param regionInfo the region the procedure was holding
753   */
754  public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
755    wakeRegions(procedure, regionInfo.getTable(), regionInfo);
756  }
757
758  /**
759   * Wake the procedures waiting for the specified regions
760   * @param procedure the procedure that was holding the regions
761   * @param regionInfo the list of regions the procedure was holding
762   */
763  public void wakeRegions(final Procedure<?> procedure,final TableName table,
764      final RegionInfo... regionInfo) {
765    Arrays.sort(regionInfo, RegionInfo.COMPARATOR);
766    schedLock();
767    try {
768      int numProcs = 0;
769      final Procedure<?>[] nextProcs = new Procedure[regionInfo.length];
770      for (int i = 0; i < regionInfo.length; ++i) {
771        assert regionInfo[i].getTable().equals(table);
772        assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i];
773
774        LockAndQueue regionLock = locking.getRegionLock(regionInfo[i].getEncodedName());
775        if (regionLock.releaseExclusiveLock(procedure)) {
776          if (!regionLock.isWaitingQueueEmpty()) {
777            // release one procedure at the time since regions has an xlock
778            nextProcs[numProcs++] = regionLock.removeFirst();
779          } else {
780            locking.removeRegionLock(regionInfo[i].getEncodedName());
781          }
782        }
783      }
784
785      // awake procedures if any
786      for (int i = numProcs - 1; i >= 0; --i) {
787        wakeProcedure(nextProcs[i]);
788      }
789      wakePollIfNeeded(numProcs);
790      // release the table shared-lock.
791      wakeTableSharedLock(procedure, table);
792    } finally {
793      schedUnlock();
794    }
795  }
796
797  // ============================================================================
798  //  Namespace Locking Helpers
799  // ============================================================================
800  /**
801   * Suspend the procedure if the specified namespace is already locked.
802   * @see #wakeNamespaceExclusiveLock(Procedure,String)
803   * @param procedure the procedure trying to acquire the lock
804   * @param namespace Namespace to lock
805   * @return true if the procedure has to wait for the namespace to be available
806   */
807  public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) {
808    schedLock();
809    try {
810      final LockAndQueue systemNamespaceTableLock =
811        locking.getTableLock(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME);
812      if (!systemNamespaceTableLock.trySharedLock(procedure)) {
813        waitProcedure(systemNamespaceTableLock, procedure);
814        logLockedResource(LockedResourceType.TABLE,
815          TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME.getNameAsString());
816        return true;
817      }
818
819      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
820      if (!namespaceLock.tryExclusiveLock(procedure)) {
821        systemNamespaceTableLock.releaseSharedLock();
822        waitProcedure(namespaceLock, procedure);
823        logLockedResource(LockedResourceType.NAMESPACE, namespace);
824        return true;
825      }
826      return false;
827    } finally {
828      schedUnlock();
829    }
830  }
831
832  /**
833   * Wake the procedures waiting for the specified namespace
834   * @see #waitNamespaceExclusiveLock(Procedure,String)
835   * @param procedure the procedure releasing the lock
836   * @param namespace the namespace that has the exclusive lock
837   */
838  public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) {
839    schedLock();
840    try {
841      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
842      final LockAndQueue systemNamespaceTableLock =
843        locking.getTableLock(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME);
844      int waitingCount = 0;
845      if (namespaceLock.releaseExclusiveLock(procedure)) {
846        waitingCount += wakeWaitingProcedures(namespaceLock);
847      }
848      if (systemNamespaceTableLock.releaseSharedLock()) {
849        addToRunQueue(tableRunQueue,
850          getTableQueue(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME),
851          () -> procedure + " released namespace exclusive lock");
852        waitingCount += wakeWaitingProcedures(systemNamespaceTableLock);
853      }
854      wakePollIfNeeded(waitingCount);
855    } finally {
856      schedUnlock();
857    }
858  }
859
860  // ============================================================================
861  //  Server Locking Helpers
862  // ============================================================================
863  /**
864   * Try to acquire the exclusive lock on the specified server.
865   * @see #wakeServerExclusiveLock(Procedure,ServerName)
866   * @param procedure the procedure trying to acquire the lock
867   * @param serverName Server to lock
868   * @return true if the procedure has to wait for the server to be available
869   */
870  public boolean waitServerExclusiveLock(final Procedure<?> procedure,
871      final ServerName serverName) {
872    schedLock();
873    try {
874      final LockAndQueue lock = locking.getServerLock(serverName);
875      if (lock.tryExclusiveLock(procedure)) {
876        // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
877        // so.
878        removeFromRunQueue(serverRunQueue,
879          getServerQueue(serverName,
880            procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
881              : null),
882          () -> procedure + " held exclusive lock");
883        return false;
884      }
885      waitProcedure(lock, procedure);
886      logLockedResource(LockedResourceType.SERVER, serverName.getServerName());
887      return true;
888    } finally {
889      schedUnlock();
890    }
891  }
892
893  /**
894   * Wake the procedures waiting for the specified server
895   * @see #waitServerExclusiveLock(Procedure,ServerName)
896   * @param procedure the procedure releasing the lock
897   * @param serverName the server that has the exclusive lock
898   */
899  public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) {
900    schedLock();
901    try {
902      final LockAndQueue lock = locking.getServerLock(serverName);
903      // Only SCP will acquire/release server lock so do not need to check the return value here.
904      lock.releaseExclusiveLock(procedure);
905      // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
906      // so.
907      addToRunQueue(serverRunQueue,
908        getServerQueue(serverName,
909          procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
910            : null), () -> procedure + " released exclusive lock");
911      int waitingCount = wakeWaitingProcedures(lock);
912      wakePollIfNeeded(waitingCount);
913    } finally {
914      schedUnlock();
915    }
916  }
917
918  // ============================================================================
919  //  Peer Locking Helpers
920  // ============================================================================
921  /**
922   * Try to acquire the exclusive lock on the specified peer.
923   * @see #wakePeerExclusiveLock(Procedure, String)
924   * @param procedure the procedure trying to acquire the lock
925   * @param peerId peer to lock
926   * @return true if the procedure has to wait for the peer to be available
927   */
928  public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) {
929    schedLock();
930    try {
931      final LockAndQueue lock = locking.getPeerLock(peerId);
932      if (lock.tryExclusiveLock(procedure)) {
933        removeFromRunQueue(peerRunQueue, getPeerQueue(peerId),
934          () -> procedure + " held exclusive lock");
935        return false;
936      }
937      waitProcedure(lock, procedure);
938      logLockedResource(LockedResourceType.PEER, peerId);
939      return true;
940    } finally {
941      schedUnlock();
942    }
943  }
944
945  /**
946   * Wake the procedures waiting for the specified peer
947   * @see #waitPeerExclusiveLock(Procedure, String)
948   * @param procedure the procedure releasing the lock
949   * @param peerId the peer that has the exclusive lock
950   */
951  public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) {
952    schedLock();
953    try {
954      final LockAndQueue lock = locking.getPeerLock(peerId);
955      if (lock.releaseExclusiveLock(procedure)) {
956        addToRunQueue(peerRunQueue, getPeerQueue(peerId),
957          () -> procedure + " released exclusive lock");
958        int waitingCount = wakeWaitingProcedures(lock);
959        wakePollIfNeeded(waitingCount);
960      }
961    } finally {
962      schedUnlock();
963    }
964  }
965
966  // ============================================================================
967  // Meta Locking Helpers
968  // ============================================================================
969  /**
970   * Try to acquire the exclusive lock on meta.
971   * @see #wakeMetaExclusiveLock(Procedure)
972   * @param procedure the procedure trying to acquire the lock
973   * @return true if the procedure has to wait for meta to be available
974   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
975   *             {@link RecoverMetaProcedure}.
976   */
977  @Deprecated
978  public boolean waitMetaExclusiveLock(Procedure<?> procedure) {
979    schedLock();
980    try {
981      final LockAndQueue lock = locking.getMetaLock();
982      if (lock.tryExclusiveLock(procedure)) {
983        removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock");
984        return false;
985      }
986      waitProcedure(lock, procedure);
987      logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString());
988      return true;
989    } finally {
990      schedUnlock();
991    }
992  }
993
994  /**
995   * Wake the procedures waiting for meta.
996   * @see #waitMetaExclusiveLock(Procedure)
997   * @param procedure the procedure releasing the lock
998   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
999   *             {@link RecoverMetaProcedure}.
1000   */
1001  @Deprecated
1002  public void wakeMetaExclusiveLock(Procedure<?> procedure) {
1003    schedLock();
1004    try {
1005      final LockAndQueue lock = locking.getMetaLock();
1006      lock.releaseExclusiveLock(procedure);
1007      addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock");
1008      int waitingCount = wakeWaitingProcedures(lock);
1009      wakePollIfNeeded(waitingCount);
1010    } finally {
1011      schedUnlock();
1012    }
1013  }
1014
1015  /**
1016   * For debugging. Expensive.
1017   */
1018  @VisibleForTesting
1019  public String dumpLocks() throws IOException {
1020    schedLock();
1021    try {
1022      // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter
1023      return this.locking.toString();
1024    } finally {
1025      schedUnlock();
1026    }
1027  }
1028}