001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.function.Function;
024import java.util.function.Supplier;
025import org.apache.hadoop.hbase.ServerName;
026import org.apache.hadoop.hbase.TableExistsException;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.TableNotFoundException;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
031import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
032import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
033import org.apache.hadoop.hbase.procedure2.LockAndQueue;
034import org.apache.hadoop.hbase.procedure2.LockStatus;
035import org.apache.hadoop.hbase.procedure2.LockedResource;
036import org.apache.hadoop.hbase.procedure2.LockedResourceType;
037import org.apache.hadoop.hbase.procedure2.Procedure;
038import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
039import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
040import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
041import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * ProcedureScheduler for the Master Procedures.
048 * This ProcedureScheduler tries to provide to the ProcedureExecutor procedures
049 * that can be executed without having to wait on a lock.
050 * Most of the master operations can be executed concurrently, if they
051 * are operating on different tables (e.g. two create table procedures can be performed
052 * at the same time) or against two different servers; say two servers that crashed at
053 * about the same time.
054 *
055 * <p>Each procedure should implement an Interface providing information for this queue.
056 * For example table related procedures should implement TableProcedureInterface.
057 * Each procedure will be pushed in its own queue, and based on the operation type
058 * we may make smarter decisions: e.g. we can abort all the operations preceding
059 * a delete table, or similar.
060 *
061 * <h4>Concurrency control</h4>
062 * Concurrent access to member variables (tableRunQueue, serverRunQueue, locking, tableMap,
063 * serverBuckets) is controlled by schedLock(). This mainly includes:<br>
064 * <ul>
065 *   <li>
066 *     {@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue
067 *     when:
068 *     <ol>
069 *       <li>Queue was empty before push (so must have been out of run-queue)</li>
070 *       <li>Child procedure is added (which means parent procedure holds exclusive lock, and it
071 *           must have moved Queue out of run-queue)</li>
072 *     </ol>
073 *   </li>
074 *   <li>
075 *     {@link #poll(long)}: A poll will remove a Queue from run-queue when:
076 *     <ol>
077 *       <li>Queue becomes empty after poll</li>
078 *       <li>Exclusive lock is requested by polled procedure and lock is available (returns the
079 *           procedure)</li>
080 *       <li>Exclusive lock is requested but lock is not available (returns null)</li>
081 *       <li>Polled procedure is child of parent holding exclusive lock and the next procedure is
082 *           not a child</li>
083 *     </ol>
084 *   </li>
085 *   <li>
086 *     Namespace/table/region locks: Queue is added back to run-queue when lock being released is:
087 *     <ol>
088 *       <li>Exclusive lock</li>
089 *       <li>Last shared lock (in case queue was removed because next procedure in queue required
090 *           exclusive lock)</li>
091 *     </ol>
092 *   </li>
093 * </ul>
094 */
095@InterfaceAudience.Private
096public class MasterProcedureScheduler extends AbstractProcedureScheduler {
097  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class);
098
099  private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR =
100    (n, k) -> n.compareKey((ServerName) k);
101  private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR =
102    (n, k) -> n.compareKey((TableName) k);
103  private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR =
104    (n, k) -> n.compareKey((String) k);
105  private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
106    (n, k) -> n.compareKey((TableName) k);
107
108  private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
109  private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
110  private final FairQueue<String> peerRunQueue = new FairQueue<>();
111  private final FairQueue<TableName> metaRunQueue = new FairQueue<>();
112
113  private final ServerQueue[] serverBuckets = new ServerQueue[128];
114  private TableQueue tableMap = null;
115  private PeerQueue peerMap = null;
116  private MetaQueue metaMap = null;
117
118  private final SchemaLocking locking;
119
120  public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) {
121    locking = new SchemaLocking(procedureRetriever);
122  }
123
124  @Override
125  public void yield(final Procedure proc) {
126    push(proc, false, true);
127  }
128
129  @Override
130  protected void enqueue(final Procedure proc, final boolean addFront) {
131    if (isMetaProcedure(proc)) {
132      doAdd(metaRunQueue, getMetaQueue(), proc, addFront);
133    } else if (isTableProcedure(proc)) {
134      doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
135    } else if (isServerProcedure(proc)) {
136      ServerProcedureInterface spi = (ServerProcedureInterface) proc;
137      doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
138    } else if (isPeerProcedure(proc)) {
139      doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
140    } else {
141      // TODO: at the moment we only have Table and Server procedures
142      // if you are implementing a non-table/non-server procedure, you have two options: create
143      // a group for all the non-table/non-server procedures or try to find a key for your
144      // non-table/non-server procedures and implement something similar to the TableRunQueue.
145      throw new UnsupportedOperationException(
146        "RQs for non-table/non-server procedures are not implemented yet: " + proc);
147    }
148  }
149
150  private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,
151      Procedure<?> proc, boolean addFront) {
152    queue.add(proc, addFront);
153    // For the following conditions, we will put the queue back into execution
154    // 1. The procedure has already held the lock, or the lock has been restored when restarting,
155    // which means it can be executed immediately.
156    // 2. The exclusive lock for this queue has not been held.
157    // 3. The given procedure has the exclusive lock permission for this queue.
158    Supplier<String> reason = null;
159    if (proc.hasLock()) {
160      reason = () -> proc + " has lock";
161    } else if (proc.isLockedWhenLoading()) {
162      reason = () -> proc + " restores lock when restarting";
163    } else if (!queue.getLockStatus().hasExclusiveLock()) {
164      reason = () -> "the exclusive lock is not held by anyone when adding " + proc;
165    } else if (queue.getLockStatus().hasLockAccess(proc)) {
166      reason = () -> proc + " has the excusive lock access";
167    }
168    if (reason != null) {
169      addToRunQueue(fairq, queue, reason);
170    }
171  }
172
173  @Override
174  protected boolean queueHasRunnables() {
175    return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() ||
176      serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables();
177  }
178
179  @Override
180  protected Procedure dequeue() {
181    // meta procedure is always the first priority
182    Procedure<?> pollResult = doPoll(metaRunQueue);
183    // For now, let server handling have precedence over table handling; presumption is that it
184    // is more important handling crashed servers than it is running the
185    // enabling/disabling tables, etc.
186    if (pollResult == null) {
187      pollResult = doPoll(serverRunQueue);
188    }
189    if (pollResult == null) {
190      pollResult = doPoll(peerRunQueue);
191    }
192    if (pollResult == null) {
193      pollResult = doPoll(tableRunQueue);
194    }
195    return pollResult;
196  }
197
198  private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) {
199    LockStatus s = rq.getLockStatus();
200    // if we have the lock access, we are ready
201    if (s.hasLockAccess(proc)) {
202      return true;
203    }
204    boolean xlockReq = rq.requireExclusiveLock(proc);
205    // if we need to hold the xlock, then we need to make sure that no one holds any lock, including
206    // the shared lock, otherwise, we just need to make sure that no one holds the xlock
207    return xlockReq ? !s.isLocked() : !s.hasExclusiveLock();
208  }
209
210  private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) {
211    Queue<T> rq = fairq.poll();
212    if (rq == null || !rq.isAvailable()) {
213      return null;
214    }
215    // loop until we find out a procedure which is ready to run, or if we have checked all the
216    // procedures, then we give up and remove the queue from run queue.
217    for (int i = 0, n = rq.size(); i < n; i++) {
218      Procedure<?> proc = rq.poll();
219      if (isLockReady(proc, rq)) {
220        // the queue is empty, remove from run queue
221        if (rq.isEmpty()) {
222          removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc);
223        }
224        return proc;
225      }
226      // we are not ready to run, add back and try the next procedure
227      rq.add(proc, false);
228    }
229    // no procedure is ready for execution, remove from run queue
230    removeFromRunQueue(fairq, rq, () -> "no procedure can be executed");
231    return null;
232  }
233
234  @Override
235  public List<LockedResource> getLocks() {
236    schedLock();
237    try {
238      return locking.getLocks();
239    } finally {
240      schedUnlock();
241    }
242  }
243
244  @Override
245  public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
246    schedLock();
247    try {
248      return locking.getLockResource(resourceType, resourceName);
249    } finally {
250      schedUnlock();
251    }
252  }
253
254  @Override
255  public void clear() {
256    schedLock();
257    try {
258      clearQueue();
259      locking.clear();
260    } finally {
261      schedUnlock();
262    }
263  }
264
265  private void clearQueue() {
266    // Remove Servers
267    for (int i = 0; i < serverBuckets.length; ++i) {
268      clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
269      serverBuckets[i] = null;
270    }
271
272    // Remove Tables
273    clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
274    tableMap = null;
275
276    // Remove Peers
277    clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR);
278    peerMap = null;
279
280    assert size() == 0 : "expected queue size to be 0, got " + size();
281  }
282
283  private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap,
284      FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) {
285    while (treeMap != null) {
286      Queue<T> node = AvlTree.getFirst(treeMap);
287      treeMap = AvlTree.remove(treeMap, node.getKey(), comparator);
288      if (fairq != null) {
289        removeFromRunQueue(fairq, node, () -> "clear all queues");
290      }
291    }
292  }
293
294  private int queueSize(Queue<?> head) {
295    int count = 0;
296    AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head);
297    while (iter.hasNext()) {
298      count += iter.next().size();
299    }
300    return count;
301  }
302
303  @Override
304  protected int queueSize() {
305    int count = 0;
306    for (ServerQueue serverMap : serverBuckets) {
307      count += queueSize(serverMap);
308    }
309    count += queueSize(tableMap);
310    count += queueSize(peerMap);
311    count += queueSize(metaMap);
312    return count;
313  }
314
315  @Override
316  public void completionCleanup(final Procedure proc) {
317    if (proc instanceof TableProcedureInterface) {
318      TableProcedureInterface iProcTable = (TableProcedureInterface) proc;
319      boolean tableDeleted;
320      if (proc.hasException()) {
321        Exception procEx = proc.getException().unwrapRemoteException();
322        if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
323          // create failed because the table already exist
324          tableDeleted = !(procEx instanceof TableExistsException);
325        } else {
326          // the operation failed because the table does not exist
327          tableDeleted = (procEx instanceof TableNotFoundException);
328        }
329      } else {
330        // the table was deleted
331        tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
332      }
333      if (tableDeleted) {
334        markTableAsDeleted(iProcTable.getTableName(), proc);
335        return;
336      }
337    } else if (proc instanceof PeerProcedureInterface) {
338      tryCleanupPeerQueue(getPeerId(proc), proc);
339    } else if (proc instanceof ServerProcedureInterface) {
340      tryCleanupServerQueue(getServerName(proc), proc);
341    } else {
342      // No cleanup for other procedure types, yet.
343      return;
344    }
345  }
346
347  private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue,
348      Supplier<String> reason) {
349    if (LOG.isTraceEnabled()) {
350      LOG.trace("Add {} to run queue because: {}", queue, reason.get());
351    }
352    if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) {
353      fairq.add(queue);
354    }
355  }
356
357  private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq,
358      Queue<T> queue, Supplier<String> reason) {
359    if (LOG.isTraceEnabled()) {
360      LOG.trace("Remove {} from run queue because: {}", queue, reason.get());
361    }
362    if (AvlIterableList.isLinked(queue)) {
363      fairq.remove(queue);
364    }
365  }
366
367  // ============================================================================
368  //  Table Queue Lookup Helpers
369  // ============================================================================
370  private TableQueue getTableQueue(TableName tableName) {
371    TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
372    if (node != null) return node;
373
374    node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName),
375        locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString()));
376    tableMap = AvlTree.insert(tableMap, node);
377    return node;
378  }
379
380  private void removeTableQueue(TableName tableName) {
381    tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
382    locking.removeTableLock(tableName);
383  }
384
385  private static boolean isTableProcedure(Procedure<?> proc) {
386    return proc instanceof TableProcedureInterface;
387  }
388
389  private static TableName getTableName(Procedure<?> proc) {
390    return ((TableProcedureInterface)proc).getTableName();
391  }
392
393  // ============================================================================
394  //  Server Queue Lookup Helpers
395  // ============================================================================
396  private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) {
397    final int index = getBucketIndex(serverBuckets, serverName.hashCode());
398    ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
399    if (node != null) {
400      return node;
401    }
402    int priority;
403    if (proc != null) {
404      priority = MasterProcedureUtil.getServerPriority(proc);
405    } else {
406      priority = 1;
407    }
408    node = new ServerQueue(serverName, priority, locking.getServerLock(serverName));
409    serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
410    return node;
411  }
412
413  private void removeServerQueue(ServerName serverName) {
414    int index = getBucketIndex(serverBuckets, serverName.hashCode());
415    serverBuckets[index] =
416      AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
417    locking.removeServerLock(serverName);
418  }
419
420  private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
421    schedLock();
422    try {
423      int index = getBucketIndex(serverBuckets, serverName.hashCode());
424      ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
425      if (node == null) {
426        return;
427      }
428
429      LockAndQueue lock = locking.getServerLock(serverName);
430      if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
431        removeFromRunQueue(serverRunQueue, node,
432          () -> "clean up server queue after " + proc + " completed");
433        removeServerQueue(serverName);
434      }
435    } finally {
436      schedUnlock();
437    }
438  }
439
440  private static int getBucketIndex(Object[] buckets, int hashCode) {
441    return Math.abs(hashCode) % buckets.length;
442  }
443
444  private static boolean isServerProcedure(Procedure<?> proc) {
445    return proc instanceof ServerProcedureInterface;
446  }
447
448  private static ServerName getServerName(Procedure<?> proc) {
449    return ((ServerProcedureInterface)proc).getServerName();
450  }
451
452  // ============================================================================
453  //  Peer Queue Lookup Helpers
454  // ============================================================================
455  private PeerQueue getPeerQueue(String peerId) {
456    PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
457    if (node != null) {
458      return node;
459    }
460    node = new PeerQueue(peerId, locking.getPeerLock(peerId));
461    peerMap = AvlTree.insert(peerMap, node);
462    return node;
463  }
464
465  private void removePeerQueue(String peerId) {
466    peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
467    locking.removePeerLock(peerId);
468  }
469
470  private void tryCleanupPeerQueue(String peerId, Procedure procedure) {
471    schedLock();
472    try {
473      PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
474      if (queue == null) {
475        return;
476      }
477
478      final LockAndQueue lock = locking.getPeerLock(peerId);
479      if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
480        removeFromRunQueue(peerRunQueue, queue,
481          () -> "clean up peer queue after " + procedure + " completed");
482        removePeerQueue(peerId);
483      }
484    } finally {
485      schedUnlock();
486    }
487  }
488
489  private static boolean isPeerProcedure(Procedure<?> proc) {
490    return proc instanceof PeerProcedureInterface;
491  }
492
493  private static String getPeerId(Procedure<?> proc) {
494    return ((PeerProcedureInterface) proc).getPeerId();
495  }
496
497  // ============================================================================
498  //  Meta Queue Lookup Helpers
499  // ============================================================================
500  private MetaQueue getMetaQueue() {
501    MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR);
502    if (node != null) {
503      return node;
504    }
505    node = new MetaQueue(locking.getMetaLock());
506    metaMap = AvlTree.insert(metaMap, node);
507    return node;
508  }
509
510  private static boolean isMetaProcedure(Procedure<?> proc) {
511    return proc instanceof MetaProcedureInterface;
512  }
513  // ============================================================================
514  //  Table Locking Helpers
515  // ============================================================================
516  /**
517   * Get lock info for a resource of specified type and name and log details
518   */
519  private void logLockedResource(LockedResourceType resourceType, String resourceName) {
520    if (!LOG.isDebugEnabled()) {
521      return;
522    }
523
524    LockedResource lockedResource = getLockResource(resourceType, resourceName);
525    if (lockedResource != null) {
526      String msg = resourceType.toString() + " '" + resourceName + "', shared lock count=" +
527          lockedResource.getSharedLockCount();
528
529      Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure();
530      if (proc != null) {
531        msg += ", exclusively locked by procId=" + proc.getProcId();
532      }
533      LOG.debug(msg);
534    }
535  }
536
537  /**
538   * Suspend the procedure if the specified table is already locked.
539   * Other operations in the table-queue will be executed after the lock is released.
540   * @param procedure the procedure trying to acquire the lock
541   * @param table Table to lock
542   * @return true if the procedure has to wait for the table to be available
543   */
544  public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
545    schedLock();
546    try {
547      final String namespace = table.getNamespaceAsString();
548      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
549      final LockAndQueue tableLock = locking.getTableLock(table);
550      if (!namespaceLock.trySharedLock(procedure)) {
551        waitProcedure(namespaceLock, procedure);
552        logLockedResource(LockedResourceType.NAMESPACE, namespace);
553        return true;
554      }
555      if (!tableLock.tryExclusiveLock(procedure)) {
556        namespaceLock.releaseSharedLock();
557        waitProcedure(tableLock, procedure);
558        logLockedResource(LockedResourceType.TABLE, table.getNameAsString());
559        return true;
560      }
561      removeFromRunQueue(tableRunQueue, getTableQueue(table),
562        () -> procedure + " held the exclusive lock");
563      return false;
564    } finally {
565      schedUnlock();
566    }
567  }
568
569  /**
570   * Wake the procedures waiting for the specified table
571   * @param procedure the procedure releasing the lock
572   * @param table the name of the table that has the exclusive lock
573   */
574  public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
575    schedLock();
576    try {
577      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
578      final LockAndQueue tableLock = locking.getTableLock(table);
579      int waitingCount = 0;
580      if (tableLock.releaseExclusiveLock(procedure)) {
581        waitingCount += wakeWaitingProcedures(tableLock);
582      }
583      if (namespaceLock.releaseSharedLock()) {
584        waitingCount += wakeWaitingProcedures(namespaceLock);
585      }
586      addToRunQueue(tableRunQueue, getTableQueue(table),
587        () -> procedure + " released the exclusive lock");
588      wakePollIfNeeded(waitingCount);
589    } finally {
590      schedUnlock();
591    }
592  }
593
594  /**
595   * Suspend the procedure if the specified table is already locked.
596   * other "read" operations in the table-queue may be executed concurrently,
597   * @param procedure the procedure trying to acquire the lock
598   * @param table Table to lock
599   * @return true if the procedure has to wait for the table to be available
600   */
601  public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) {
602    return waitTableQueueSharedLock(procedure, table) == null;
603  }
604
605  private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) {
606    schedLock();
607    try {
608      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
609      final LockAndQueue tableLock = locking.getTableLock(table);
610      if (!namespaceLock.trySharedLock(procedure)) {
611        waitProcedure(namespaceLock, procedure);
612        return null;
613      }
614
615      if (!tableLock.trySharedLock(procedure)) {
616        namespaceLock.releaseSharedLock();
617        waitProcedure(tableLock, procedure);
618        return null;
619      }
620
621      return getTableQueue(table);
622    } finally {
623      schedUnlock();
624    }
625  }
626
627  /**
628   * Wake the procedures waiting for the specified table
629   * @param procedure the procedure releasing the lock
630   * @param table the name of the table that has the shared lock
631   */
632  public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) {
633    schedLock();
634    try {
635      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
636      final LockAndQueue tableLock = locking.getTableLock(table);
637      int waitingCount = 0;
638      if (tableLock.releaseSharedLock()) {
639        addToRunQueue(tableRunQueue, getTableQueue(table),
640          () -> procedure + " released the shared lock");
641        waitingCount += wakeWaitingProcedures(tableLock);
642      }
643      if (namespaceLock.releaseSharedLock()) {
644        waitingCount += wakeWaitingProcedures(namespaceLock);
645      }
646      wakePollIfNeeded(waitingCount);
647    } finally {
648      schedUnlock();
649    }
650  }
651
652  /**
653   * Tries to remove the queue and the table-lock of the specified table.
654   * If there are new operations pending (e.g. a new create),
655   * the remove will not be performed.
656   * @param table the name of the table that should be marked as deleted
657   * @param procedure the procedure that is removing the table
658   * @return true if deletion succeeded, false otherwise meaning that there are
659   *     other new operations pending for that table (e.g. a new create).
660   */
661  boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) {
662    schedLock();
663    try {
664      final TableQueue queue = getTableQueue(table);
665      final LockAndQueue tableLock = locking.getTableLock(table);
666      if (queue == null) return true;
667
668      if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) {
669        // remove the table from the run-queue and the map
670        if (AvlIterableList.isLinked(queue)) {
671          tableRunQueue.remove(queue);
672        }
673        removeTableQueue(table);
674      } else {
675        // TODO: If there are no create, we can drop all the other ops
676        return false;
677      }
678    } finally {
679      schedUnlock();
680    }
681    return true;
682  }
683
684  // ============================================================================
685  //  Region Locking Helpers
686  // ============================================================================
687  /**
688   * Suspend the procedure if the specified region is already locked.
689   * @param procedure the procedure trying to acquire the lock on the region
690   * @param regionInfo the region we are trying to lock
691   * @return true if the procedure has to wait for the regions to be available
692   */
693  public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
694    return waitRegions(procedure, regionInfo.getTable(), regionInfo);
695  }
696
697  /**
698   * Suspend the procedure if the specified set of regions are already locked.
699   * @param procedure the procedure trying to acquire the lock on the regions
700   * @param table the table name of the regions we are trying to lock
701   * @param regionInfos the list of regions we are trying to lock
702   * @return true if the procedure has to wait for the regions to be available
703   */
704  public boolean waitRegions(final Procedure<?> procedure, final TableName table,
705      final RegionInfo... regionInfos) {
706    Arrays.sort(regionInfos, RegionInfo.COMPARATOR);
707    schedLock();
708    try {
709      assert table != null;
710      if (waitTableSharedLock(procedure, table)) {
711        return true;
712      }
713
714      // acquire region xlocks or wait
715      boolean hasLock = true;
716      final LockAndQueue[] regionLocks = new LockAndQueue[regionInfos.length];
717      for (int i = 0; i < regionInfos.length; ++i) {
718        assert regionInfos[i] != null;
719        assert regionInfos[i].getTable() != null;
720        assert regionInfos[i].getTable().equals(table): regionInfos[i] + " " + procedure;
721        assert i == 0 || regionInfos[i] != regionInfos[i - 1] : "duplicate region: " + regionInfos[i];
722
723        regionLocks[i] = locking.getRegionLock(regionInfos[i].getEncodedName());
724        if (!regionLocks[i].tryExclusiveLock(procedure)) {
725          LOG.info("Waiting on xlock for {} held by pid={}", procedure,
726              regionLocks[i].getExclusiveLockProcIdOwner());
727          waitProcedure(regionLocks[i], procedure);
728          hasLock = false;
729          while (i-- > 0) {
730            regionLocks[i].releaseExclusiveLock(procedure);
731          }
732          break;
733        } else {
734          LOG.info("Took xlock for {}", procedure);
735        }
736      }
737
738      if (!hasLock) {
739        wakeTableSharedLock(procedure, table);
740      }
741      return !hasLock;
742    } finally {
743      schedUnlock();
744    }
745  }
746
747  /**
748   * Wake the procedures waiting for the specified region
749   * @param procedure the procedure that was holding the region
750   * @param regionInfo the region the procedure was holding
751   */
752  public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
753    wakeRegions(procedure, regionInfo.getTable(), regionInfo);
754  }
755
756  /**
757   * Wake the procedures waiting for the specified regions
758   * @param procedure the procedure that was holding the regions
759   * @param regionInfos the list of regions the procedure was holding
760   */
761  public void wakeRegions(final Procedure<?> procedure,final TableName table,
762      final RegionInfo... regionInfos) {
763    Arrays.sort(regionInfos, RegionInfo.COMPARATOR);
764    schedLock();
765    try {
766      int numProcs = 0;
767      final Procedure<?>[] nextProcs = new Procedure[regionInfos.length];
768      for (int i = 0; i < regionInfos.length; ++i) {
769        assert regionInfos[i].getTable().equals(table);
770        assert i == 0 || regionInfos[i] != regionInfos[i - 1] : "duplicate region: " + regionInfos[i];
771
772        LockAndQueue regionLock = locking.getRegionLock(regionInfos[i].getEncodedName());
773        if (regionLock.releaseExclusiveLock(procedure)) {
774          if (!regionLock.isWaitingQueueEmpty()) {
775            // release one procedure at the time since regions has an xlock
776            nextProcs[numProcs++] = regionLock.removeFirst();
777          } else {
778            locking.removeRegionLock(regionInfos[i].getEncodedName());
779          }
780        }
781      }
782
783      // awake procedures if any
784      for (int i = numProcs - 1; i >= 0; --i) {
785        wakeProcedure(nextProcs[i]);
786      }
787      wakePollIfNeeded(numProcs);
788      // release the table shared-lock.
789      wakeTableSharedLock(procedure, table);
790    } finally {
791      schedUnlock();
792    }
793  }
794
795  // ============================================================================
796  //  Namespace Locking Helpers
797  // ============================================================================
798  /**
799   * Suspend the procedure if the specified namespace is already locked.
800   * @see #wakeNamespaceExclusiveLock(Procedure,String)
801   * @param procedure the procedure trying to acquire the lock
802   * @param namespace Namespace to lock
803   * @return true if the procedure has to wait for the namespace to be available
804   */
805  public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) {
806    schedLock();
807    try {
808      final LockAndQueue systemNamespaceTableLock =
809        locking.getTableLock(TableName.NAMESPACE_TABLE_NAME);
810      if (!systemNamespaceTableLock.trySharedLock(procedure)) {
811        waitProcedure(systemNamespaceTableLock, procedure);
812        logLockedResource(LockedResourceType.TABLE,
813          TableName.NAMESPACE_TABLE_NAME.getNameAsString());
814        return true;
815      }
816
817      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
818      if (!namespaceLock.tryExclusiveLock(procedure)) {
819        systemNamespaceTableLock.releaseSharedLock();
820        waitProcedure(namespaceLock, procedure);
821        logLockedResource(LockedResourceType.NAMESPACE, namespace);
822        return true;
823      }
824      return false;
825    } finally {
826      schedUnlock();
827    }
828  }
829
830  /**
831   * Wake the procedures waiting for the specified namespace
832   * @see #waitNamespaceExclusiveLock(Procedure,String)
833   * @param procedure the procedure releasing the lock
834   * @param namespace the namespace that has the exclusive lock
835   */
836  public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) {
837    schedLock();
838    try {
839      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
840      final LockAndQueue systemNamespaceTableLock =
841          locking.getTableLock(TableName.NAMESPACE_TABLE_NAME);
842      int waitingCount = 0;
843      if (namespaceLock.releaseExclusiveLock(procedure)) {
844        waitingCount += wakeWaitingProcedures(namespaceLock);
845      }
846      if (systemNamespaceTableLock.releaseSharedLock()) {
847        addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME),
848          () -> procedure + " released namespace exclusive lock");
849        waitingCount += wakeWaitingProcedures(systemNamespaceTableLock);
850      }
851      wakePollIfNeeded(waitingCount);
852    } finally {
853      schedUnlock();
854    }
855  }
856
857  // ============================================================================
858  //  Server Locking Helpers
859  // ============================================================================
860  /**
861   * Try to acquire the exclusive lock on the specified server.
862   * @see #wakeServerExclusiveLock(Procedure,ServerName)
863   * @param procedure the procedure trying to acquire the lock
864   * @param serverName Server to lock
865   * @return true if the procedure has to wait for the server to be available
866   */
867  public boolean waitServerExclusiveLock(final Procedure<?> procedure,
868      final ServerName serverName) {
869    schedLock();
870    try {
871      final LockAndQueue lock = locking.getServerLock(serverName);
872      if (lock.tryExclusiveLock(procedure)) {
873        // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
874        // so.
875        removeFromRunQueue(serverRunQueue,
876          getServerQueue(serverName,
877            procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
878              : null),
879          () -> procedure + " held exclusive lock");
880        return false;
881      }
882      waitProcedure(lock, procedure);
883      logLockedResource(LockedResourceType.SERVER, serverName.getServerName());
884      return true;
885    } finally {
886      schedUnlock();
887    }
888  }
889
890  /**
891   * Wake the procedures waiting for the specified server
892   * @see #waitServerExclusiveLock(Procedure,ServerName)
893   * @param procedure the procedure releasing the lock
894   * @param serverName the server that has the exclusive lock
895   */
896  public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) {
897    schedLock();
898    try {
899      final LockAndQueue lock = locking.getServerLock(serverName);
900      // Only SCP will acquire/release server lock so do not need to check the return value here.
901      lock.releaseExclusiveLock(procedure);
902      // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
903      // so.
904      addToRunQueue(serverRunQueue,
905        getServerQueue(serverName,
906          procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
907            : null), () -> procedure + " released exclusive lock");
908      int waitingCount = wakeWaitingProcedures(lock);
909      wakePollIfNeeded(waitingCount);
910    } finally {
911      schedUnlock();
912    }
913  }
914
915  // ============================================================================
916  //  Peer Locking Helpers
917  // ============================================================================
918  private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) {
919    return proc.getPeerOperationType() != PeerOperationType.REFRESH;
920  }
921
922  /**
923   * Try to acquire the exclusive lock on the specified peer.
924   * @see #wakePeerExclusiveLock(Procedure, String)
925   * @param procedure the procedure trying to acquire the lock
926   * @param peerId peer to lock
927   * @return true if the procedure has to wait for the peer to be available
928   */
929  public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) {
930    schedLock();
931    try {
932      final LockAndQueue lock = locking.getPeerLock(peerId);
933      if (lock.tryExclusiveLock(procedure)) {
934        removeFromRunQueue(peerRunQueue, getPeerQueue(peerId),
935          () -> procedure + " held exclusive lock");
936        return false;
937      }
938      waitProcedure(lock, procedure);
939      logLockedResource(LockedResourceType.PEER, peerId);
940      return true;
941    } finally {
942      schedUnlock();
943    }
944  }
945
946  /**
947   * Wake the procedures waiting for the specified peer
948   * @see #waitPeerExclusiveLock(Procedure, String)
949   * @param procedure the procedure releasing the lock
950   * @param peerId the peer that has the exclusive lock
951   */
952  public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) {
953    schedLock();
954    try {
955      final LockAndQueue lock = locking.getPeerLock(peerId);
956      if (lock.releaseExclusiveLock(procedure)) {
957        addToRunQueue(peerRunQueue, getPeerQueue(peerId),
958          () -> procedure + " released exclusive lock");
959        int waitingCount = wakeWaitingProcedures(lock);
960        wakePollIfNeeded(waitingCount);
961      }
962    } finally {
963      schedUnlock();
964    }
965  }
966
967  // ============================================================================
968  // Meta Locking Helpers
969  // ============================================================================
970  /**
971   * Try to acquire the exclusive lock on meta.
972   * @see #wakeMetaExclusiveLock(Procedure)
973   * @param procedure the procedure trying to acquire the lock
974   * @return true if the procedure has to wait for meta to be available
975   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
976   *             {@link RecoverMetaProcedure}.
977   */
978  @Deprecated
979  public boolean waitMetaExclusiveLock(Procedure<?> procedure) {
980    schedLock();
981    try {
982      final LockAndQueue lock = locking.getMetaLock();
983      if (lock.tryExclusiveLock(procedure)) {
984        removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock");
985        return false;
986      }
987      waitProcedure(lock, procedure);
988      logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString());
989      return true;
990    } finally {
991      schedUnlock();
992    }
993  }
994
995  /**
996   * Wake the procedures waiting for meta.
997   * @see #waitMetaExclusiveLock(Procedure)
998   * @param procedure the procedure releasing the lock
999   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
1000   *             {@link RecoverMetaProcedure}.
1001   */
1002  @Deprecated
1003  public void wakeMetaExclusiveLock(Procedure<?> procedure) {
1004    schedLock();
1005    try {
1006      final LockAndQueue lock = locking.getMetaLock();
1007      lock.releaseExclusiveLock(procedure);
1008      addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock");
1009      int waitingCount = wakeWaitingProcedures(lock);
1010      wakePollIfNeeded(waitingCount);
1011    } finally {
1012      schedUnlock();
1013    }
1014  }
1015
1016  /**
1017   * For debugging. Expensive.
1018   */
1019  public String dumpLocks() throws IOException {
1020    schedLock();
1021    try {
1022      // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter
1023      return this.locking.toString();
1024    } finally {
1025      schedUnlock();
1026    }
1027  }
1028}