001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.function.Function;
024import java.util.function.Supplier;
025import org.apache.hadoop.hbase.ServerName;
026import org.apache.hadoop.hbase.TableExistsException;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.TableNotFoundException;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
031import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
032import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
033import org.apache.hadoop.hbase.procedure2.LockAndQueue;
034import org.apache.hadoop.hbase.procedure2.LockStatus;
035import org.apache.hadoop.hbase.procedure2.LockedResource;
036import org.apache.hadoop.hbase.procedure2.LockedResourceType;
037import org.apache.hadoop.hbase.procedure2.Procedure;
038import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
039import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
040import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
041import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
047
048/**
049 * ProcedureScheduler for the Master Procedures.
050 * This ProcedureScheduler tries to provide to the ProcedureExecutor procedures
051 * that can be executed without having to wait on a lock.
052 * Most of the master operations can be executed concurrently, if they
053 * are operating on different tables (e.g. two create table procedures can be performed
054 * at the same time) or against two different servers; say two servers that crashed at
055 * about the same time.
056 *
057 * <p>Each procedure should implement an Interface providing information for this queue.
058 * For example table related procedures should implement TableProcedureInterface.
059 * Each procedure will be pushed in its own queue, and based on the operation type
060 * we may make smarter decisions: e.g. we can abort all the operations preceding
061 * a delete table, or similar.
062 *
063 * <h4>Concurrency control</h4>
064 * Concurrent access to member variables (tableRunQueue, serverRunQueue, locking, tableMap,
065 * serverBuckets) is controlled by schedLock(). This mainly includes:<br>
066 * <ul>
067 *   <li>
068 *     {@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue
069 *     when:
070 *     <ol>
071 *       <li>Queue was empty before push (so must have been out of run-queue)</li>
072 *       <li>Child procedure is added (which means parent procedure holds exclusive lock, and it
073 *           must have moved Queue out of run-queue)</li>
074 *     </ol>
075 *   </li>
076 *   <li>
077 *     {@link #poll(long)}: A poll will remove a Queue from run-queue when:
078 *     <ol>
079 *       <li>Queue becomes empty after poll</li>
080 *       <li>Exclusive lock is requested by polled procedure and lock is available (returns the
081 *           procedure)</li>
082 *       <li>Exclusive lock is requested but lock is not available (returns null)</li>
083 *       <li>Polled procedure is child of parent holding exclusive lock and the next procedure is
084 *           not a child</li>
085 *     </ol>
086 *   </li>
087 *   <li>
088 *     Namespace/table/region locks: Queue is added back to run-queue when lock being released is:
089 *     <ol>
090 *       <li>Exclusive lock</li>
091 *       <li>Last shared lock (in case queue was removed because next procedure in queue required
092 *           exclusive lock)</li>
093 *     </ol>
094 *   </li>
095 * </ul>
096 */
097@InterfaceAudience.Private
098public class MasterProcedureScheduler extends AbstractProcedureScheduler {
099  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class);
100
101  private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR =
102    (n, k) -> n.compareKey((ServerName) k);
103  private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR =
104    (n, k) -> n.compareKey((TableName) k);
105  private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR =
106    (n, k) -> n.compareKey((String) k);
107  private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
108    (n, k) -> n.compareKey((TableName) k);
109
110  private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
111  private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
112  private final FairQueue<String> peerRunQueue = new FairQueue<>();
113  private final FairQueue<TableName> metaRunQueue = new FairQueue<>();
114
115  private final ServerQueue[] serverBuckets = new ServerQueue[128];
116  private TableQueue tableMap = null;
117  private PeerQueue peerMap = null;
118  private MetaQueue metaMap = null;
119
120  private final SchemaLocking locking;
121
122  public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) {
123    locking = new SchemaLocking(procedureRetriever);
124  }
125
126  @Override
127  public void yield(final Procedure proc) {
128    push(proc, false, true);
129  }
130
131  @Override
132  protected void enqueue(final Procedure proc, final boolean addFront) {
133    if (isMetaProcedure(proc) ||
134        (isTableProcedure(proc) && getTableName(proc).equals(TableName.META_TABLE_NAME))) {
135      doAdd(metaRunQueue, getMetaQueue(), proc, addFront);
136    } else if (isTableProcedure(proc)) {
137      doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
138    } else if (isServerProcedure(proc)) {
139      ServerProcedureInterface spi = (ServerProcedureInterface) proc;
140      doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
141    } else if (isPeerProcedure(proc)) {
142      doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
143    } else {
144      // TODO: at the moment we only have Table and Server procedures
145      // if you are implementing a non-table/non-server procedure, you have two options: create
146      // a group for all the non-table/non-server procedures or try to find a key for your
147      // non-table/non-server procedures and implement something similar to the TableRunQueue.
148      throw new UnsupportedOperationException(
149        "RQs for non-table/non-server procedures are not implemented yet: " + proc);
150    }
151  }
152
153  private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,
154      Procedure<?> proc, boolean addFront) {
155    queue.add(proc, addFront);
156    // For the following conditions, we will put the queue back into execution
157    // 1. The procedure has already held the lock, or the lock has been restored when restarting,
158    // which means it can be executed immediately.
159    // 2. The exclusive lock for this queue has not been held.
160    // 3. The given procedure has the exclusive lock permission for this queue.
161    Supplier<String> reason = null;
162    if (proc.hasLock()) {
163      reason = () -> proc + " has lock";
164    } else if (proc.isLockedWhenLoading()) {
165      reason = () -> proc + " restores lock when restarting";
166    } else if (!queue.getLockStatus().hasExclusiveLock()) {
167      reason = () -> "the exclusive lock is not held by anyone when adding " + proc;
168    } else if (queue.getLockStatus().hasLockAccess(proc)) {
169      reason = () -> proc + " has the excusive lock access";
170    }
171    if (reason != null) {
172      addToRunQueue(fairq, queue, reason);
173    }
174  }
175
176  @Override
177  protected boolean queueHasRunnables() {
178    return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() ||
179      serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables();
180  }
181
182  @Override
183  protected Procedure dequeue(boolean onlyUrgent) {
184    // meta procedure is always the first priority
185    Procedure<?> pollResult = doPoll(metaRunQueue);
186    if (onlyUrgent) {
187      return pollResult;
188    }
189    // For now, let server handling have precedence over table handling; presumption is that it
190    // is more important handling crashed servers than it is running the
191    // enabling/disabling tables, etc.
192    if (pollResult == null) {
193      pollResult = doPoll(serverRunQueue);
194    }
195    if (pollResult == null) {
196      pollResult = doPoll(peerRunQueue);
197    }
198    if (pollResult == null) {
199      pollResult = doPoll(tableRunQueue);
200    }
201    return pollResult;
202  }
203
204  private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) {
205    LockStatus s = rq.getLockStatus();
206    // if we have the lock access, we are ready
207    if (s.hasLockAccess(proc)) {
208      return true;
209    }
210    boolean xlockReq = rq.requireExclusiveLock(proc);
211    // if we need to hold the xlock, then we need to make sure that no one holds any lock, including
212    // the shared lock, otherwise, we just need to make sure that no one holds the xlock
213    return xlockReq ? !s.isLocked() : !s.hasExclusiveLock();
214  }
215
216  private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) {
217    Queue<T> rq = fairq.poll();
218    if (rq == null || !rq.isAvailable()) {
219      return null;
220    }
221    // loop until we find out a procedure which is ready to run, or if we have checked all the
222    // procedures, then we give up and remove the queue from run queue.
223    for (int i = 0, n = rq.size(); i < n; i++) {
224      Procedure<?> proc = rq.poll();
225      if (isLockReady(proc, rq)) {
226        // the queue is empty, remove from run queue
227        if (rq.isEmpty()) {
228          removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc);
229        }
230        return proc;
231      }
232      // we are not ready to run, add back and try the next procedure
233      rq.add(proc, false);
234    }
235    // no procedure is ready for execution, remove from run queue
236    removeFromRunQueue(fairq, rq, () -> "no procedure can be executed");
237    return null;
238  }
239
240  @Override
241  public List<LockedResource> getLocks() {
242    schedLock();
243    try {
244      return locking.getLocks();
245    } finally {
246      schedUnlock();
247    }
248  }
249
250  @Override
251  public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
252    schedLock();
253    try {
254      return locking.getLockResource(resourceType, resourceName);
255    } finally {
256      schedUnlock();
257    }
258  }
259
260  @Override
261  public void clear() {
262    schedLock();
263    try {
264      clearQueue();
265      locking.clear();
266    } finally {
267      schedUnlock();
268    }
269  }
270
271  private void clearQueue() {
272    // Remove Servers
273    for (int i = 0; i < serverBuckets.length; ++i) {
274      clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
275      serverBuckets[i] = null;
276    }
277
278    // Remove Tables
279    clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
280    tableMap = null;
281
282    // Remove Peers
283    clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR);
284    peerMap = null;
285
286    assert size() == 0 : "expected queue size to be 0, got " + size();
287  }
288
289  private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap,
290      FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) {
291    while (treeMap != null) {
292      Queue<T> node = AvlTree.getFirst(treeMap);
293      treeMap = AvlTree.remove(treeMap, node.getKey(), comparator);
294      if (fairq != null) {
295        removeFromRunQueue(fairq, node, () -> "clear all queues");
296      }
297    }
298  }
299
300  private int queueSize(Queue<?> head) {
301    int count = 0;
302    AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head);
303    while (iter.hasNext()) {
304      count += iter.next().size();
305    }
306    return count;
307  }
308
309  @Override
310  protected int queueSize() {
311    int count = 0;
312    for (ServerQueue serverMap : serverBuckets) {
313      count += queueSize(serverMap);
314    }
315    count += queueSize(tableMap);
316    count += queueSize(peerMap);
317    count += queueSize(metaMap);
318    return count;
319  }
320
321  @Override
322  public void completionCleanup(final Procedure proc) {
323    if (proc instanceof TableProcedureInterface) {
324      TableProcedureInterface iProcTable = (TableProcedureInterface) proc;
325      boolean tableDeleted;
326      if (proc.hasException()) {
327        Exception procEx = proc.getException().unwrapRemoteException();
328        if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
329          // create failed because the table already exist
330          tableDeleted = !(procEx instanceof TableExistsException);
331        } else {
332          // the operation failed because the table does not exist
333          tableDeleted = (procEx instanceof TableNotFoundException);
334        }
335      } else {
336        // the table was deleted
337        tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
338      }
339      if (tableDeleted) {
340        markTableAsDeleted(iProcTable.getTableName(), proc);
341        return;
342      }
343    } else if (proc instanceof PeerProcedureInterface) {
344      tryCleanupPeerQueue(getPeerId(proc), proc);
345    } else if (proc instanceof ServerProcedureInterface) {
346      tryCleanupServerQueue(getServerName(proc), proc);
347    } else {
348      // No cleanup for other procedure types, yet.
349      return;
350    }
351  }
352
353  private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue,
354      Supplier<String> reason) {
355    if (LOG.isDebugEnabled()) {
356      LOG.debug("Add {} to run queue because: {}", queue, reason.get());
357    }
358    if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) {
359      fairq.add(queue);
360    }
361  }
362
363  private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq,
364      Queue<T> queue, Supplier<String> reason) {
365    if (LOG.isDebugEnabled()) {
366      LOG.debug("Remove {} from run queue because: {}", queue, reason.get());
367    }
368    if (AvlIterableList.isLinked(queue)) {
369      fairq.remove(queue);
370    }
371  }
372
373  // ============================================================================
374  //  Table Queue Lookup Helpers
375  // ============================================================================
376  private TableQueue getTableQueue(TableName tableName) {
377    TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
378    if (node != null) return node;
379
380    node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName),
381        locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString()));
382    tableMap = AvlTree.insert(tableMap, node);
383    return node;
384  }
385
386  private void removeTableQueue(TableName tableName) {
387    tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
388    locking.removeTableLock(tableName);
389  }
390
391  private static boolean isTableProcedure(Procedure<?> proc) {
392    return proc instanceof TableProcedureInterface;
393  }
394
395  private static TableName getTableName(Procedure<?> proc) {
396    return ((TableProcedureInterface)proc).getTableName();
397  }
398
399  // ============================================================================
400  //  Server Queue Lookup Helpers
401  // ============================================================================
402  private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) {
403    final int index = getBucketIndex(serverBuckets, serverName.hashCode());
404    ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
405    if (node != null) {
406      return node;
407    }
408    int priority;
409    if (proc != null) {
410      priority = MasterProcedureUtil.getServerPriority(proc);
411    } else {
412      priority = 1;
413    }
414    node = new ServerQueue(serverName, priority, locking.getServerLock(serverName));
415    serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
416    return node;
417  }
418
419  private void removeServerQueue(ServerName serverName) {
420    int index = getBucketIndex(serverBuckets, serverName.hashCode());
421    serverBuckets[index] =
422      AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
423    locking.removeServerLock(serverName);
424  }
425
426  private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
427    schedLock();
428    try {
429      int index = getBucketIndex(serverBuckets, serverName.hashCode());
430      ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
431      if (node == null) {
432        return;
433      }
434
435      LockAndQueue lock = locking.getServerLock(serverName);
436      if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
437        removeFromRunQueue(serverRunQueue, node,
438          () -> "clean up server queue after " + proc + " completed");
439        removeServerQueue(serverName);
440      }
441    } finally {
442      schedUnlock();
443    }
444  }
445
446  private static int getBucketIndex(Object[] buckets, int hashCode) {
447    return Math.abs(hashCode) % buckets.length;
448  }
449
450  private static boolean isServerProcedure(Procedure<?> proc) {
451    return proc instanceof ServerProcedureInterface;
452  }
453
454  private static ServerName getServerName(Procedure<?> proc) {
455    return ((ServerProcedureInterface)proc).getServerName();
456  }
457
458  // ============================================================================
459  //  Peer Queue Lookup Helpers
460  // ============================================================================
461  private PeerQueue getPeerQueue(String peerId) {
462    PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
463    if (node != null) {
464      return node;
465    }
466    node = new PeerQueue(peerId, locking.getPeerLock(peerId));
467    peerMap = AvlTree.insert(peerMap, node);
468    return node;
469  }
470
471  private void removePeerQueue(String peerId) {
472    peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
473    locking.removePeerLock(peerId);
474  }
475
476  private void tryCleanupPeerQueue(String peerId, Procedure procedure) {
477    schedLock();
478    try {
479      PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
480      if (queue == null) {
481        return;
482      }
483
484      final LockAndQueue lock = locking.getPeerLock(peerId);
485      if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
486        removeFromRunQueue(peerRunQueue, queue,
487          () -> "clean up peer queue after " + procedure + " completed");
488        removePeerQueue(peerId);
489      }
490    } finally {
491      schedUnlock();
492    }
493  }
494
495  private static boolean isPeerProcedure(Procedure<?> proc) {
496    return proc instanceof PeerProcedureInterface;
497  }
498
499  private static String getPeerId(Procedure<?> proc) {
500    return ((PeerProcedureInterface) proc).getPeerId();
501  }
502
503  // ============================================================================
504  //  Meta Queue Lookup Helpers
505  // ============================================================================
506  private MetaQueue getMetaQueue() {
507    MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR);
508    if (node != null) {
509      return node;
510    }
511    node = new MetaQueue(locking.getMetaLock());
512    metaMap = AvlTree.insert(metaMap, node);
513    return node;
514  }
515
516  private static boolean isMetaProcedure(Procedure<?> proc) {
517    return proc instanceof MetaProcedureInterface;
518  }
519  // ============================================================================
520  //  Table Locking Helpers
521  // ============================================================================
522  /**
523   * Get lock info for a resource of specified type and name and log details
524   */
525  private void logLockedResource(LockedResourceType resourceType, String resourceName) {
526    if (!LOG.isDebugEnabled()) {
527      return;
528    }
529
530    LockedResource lockedResource = getLockResource(resourceType, resourceName);
531    if (lockedResource != null) {
532      String msg = resourceType.toString() + " '" + resourceName + "', shared lock count=" +
533          lockedResource.getSharedLockCount();
534
535      Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure();
536      if (proc != null) {
537        msg += ", exclusively locked by procId=" + proc.getProcId();
538      }
539      LOG.debug(msg);
540    }
541  }
542
543  /**
544   * Suspend the procedure if the specified table is already locked.
545   * Other operations in the table-queue will be executed after the lock is released.
546   * @param procedure the procedure trying to acquire the lock
547   * @param table Table to lock
548   * @return true if the procedure has to wait for the table to be available
549   */
550  public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
551    schedLock();
552    try {
553      final String namespace = table.getNamespaceAsString();
554      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
555      final LockAndQueue tableLock = locking.getTableLock(table);
556      if (!namespaceLock.trySharedLock(procedure)) {
557        waitProcedure(namespaceLock, procedure);
558        logLockedResource(LockedResourceType.NAMESPACE, namespace);
559        return true;
560      }
561      if (!tableLock.tryExclusiveLock(procedure)) {
562        namespaceLock.releaseSharedLock();
563        waitProcedure(tableLock, procedure);
564        logLockedResource(LockedResourceType.TABLE, table.getNameAsString());
565        return true;
566      }
567      removeFromRunQueue(tableRunQueue, getTableQueue(table),
568        () -> procedure + " held the exclusive lock");
569      return false;
570    } finally {
571      schedUnlock();
572    }
573  }
574
575  /**
576   * Wake the procedures waiting for the specified table
577   * @param procedure the procedure releasing the lock
578   * @param table the name of the table that has the exclusive lock
579   */
580  public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
581    schedLock();
582    try {
583      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
584      final LockAndQueue tableLock = locking.getTableLock(table);
585      int waitingCount = 0;
586      if (tableLock.releaseExclusiveLock(procedure)) {
587        waitingCount += wakeWaitingProcedures(tableLock);
588      }
589      if (namespaceLock.releaseSharedLock()) {
590        waitingCount += wakeWaitingProcedures(namespaceLock);
591      }
592      addToRunQueue(tableRunQueue, getTableQueue(table),
593        () -> procedure + " released the exclusive lock");
594      wakePollIfNeeded(waitingCount);
595    } finally {
596      schedUnlock();
597    }
598  }
599
600  /**
601   * Suspend the procedure if the specified table is already locked.
602   * other "read" operations in the table-queue may be executed concurrently,
603   * @param procedure the procedure trying to acquire the lock
604   * @param table Table to lock
605   * @return true if the procedure has to wait for the table to be available
606   */
607  public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) {
608    return waitTableQueueSharedLock(procedure, table) == null;
609  }
610
611  private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) {
612    schedLock();
613    try {
614      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
615      final LockAndQueue tableLock = locking.getTableLock(table);
616      if (!namespaceLock.trySharedLock(procedure)) {
617        waitProcedure(namespaceLock, procedure);
618        return null;
619      }
620
621      if (!tableLock.trySharedLock(procedure)) {
622        namespaceLock.releaseSharedLock();
623        waitProcedure(tableLock, procedure);
624        return null;
625      }
626
627      return getTableQueue(table);
628    } finally {
629      schedUnlock();
630    }
631  }
632
633  /**
634   * Wake the procedures waiting for the specified table
635   * @param procedure the procedure releasing the lock
636   * @param table the name of the table that has the shared lock
637   */
638  public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) {
639    schedLock();
640    try {
641      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
642      final LockAndQueue tableLock = locking.getTableLock(table);
643      int waitingCount = 0;
644      if (tableLock.releaseSharedLock()) {
645        addToRunQueue(tableRunQueue, getTableQueue(table),
646          () -> procedure + " released the shared lock");
647        waitingCount += wakeWaitingProcedures(tableLock);
648      }
649      if (namespaceLock.releaseSharedLock()) {
650        waitingCount += wakeWaitingProcedures(namespaceLock);
651      }
652      wakePollIfNeeded(waitingCount);
653    } finally {
654      schedUnlock();
655    }
656  }
657
658  /**
659   * Tries to remove the queue and the table-lock of the specified table.
660   * If there are new operations pending (e.g. a new create),
661   * the remove will not be performed.
662   * @param table the name of the table that should be marked as deleted
663   * @param procedure the procedure that is removing the table
664   * @return true if deletion succeeded, false otherwise meaning that there are
665   *     other new operations pending for that table (e.g. a new create).
666   */
667  @VisibleForTesting
668  boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) {
669    schedLock();
670    try {
671      final TableQueue queue = getTableQueue(table);
672      final LockAndQueue tableLock = locking.getTableLock(table);
673      if (queue == null) return true;
674
675      if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) {
676        // remove the table from the run-queue and the map
677        if (AvlIterableList.isLinked(queue)) {
678          tableRunQueue.remove(queue);
679        }
680        removeTableQueue(table);
681      } else {
682        // TODO: If there are no create, we can drop all the other ops
683        return false;
684      }
685    } finally {
686      schedUnlock();
687    }
688    return true;
689  }
690
691  // ============================================================================
692  //  Region Locking Helpers
693  // ============================================================================
694  /**
695   * Suspend the procedure if the specified region is already locked.
696   * @param procedure the procedure trying to acquire the lock on the region
697   * @param regionInfo the region we are trying to lock
698   * @return true if the procedure has to wait for the regions to be available
699   */
700  public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
701    return waitRegions(procedure, regionInfo.getTable(), regionInfo);
702  }
703
704  /**
705   * Suspend the procedure if the specified set of regions are already locked.
706   * @param procedure the procedure trying to acquire the lock on the regions
707   * @param table the table name of the regions we are trying to lock
708   * @param regionInfo the list of regions we are trying to lock
709   * @return true if the procedure has to wait for the regions to be available
710   */
711  public boolean waitRegions(final Procedure<?> procedure, final TableName table,
712      final RegionInfo... regionInfo) {
713    Arrays.sort(regionInfo, RegionInfo.COMPARATOR);
714    schedLock();
715    try {
716      assert table != null;
717      if (waitTableSharedLock(procedure, table)) {
718        return true;
719      }
720
721      // acquire region xlocks or wait
722      boolean hasLock = true;
723      final LockAndQueue[] regionLocks = new LockAndQueue[regionInfo.length];
724      for (int i = 0; i < regionInfo.length; ++i) {
725        assert regionInfo[i] != null;
726        assert regionInfo[i].getTable() != null;
727        assert regionInfo[i].getTable().equals(table): regionInfo[i] + " " + procedure;
728        assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i];
729
730        regionLocks[i] = locking.getRegionLock(regionInfo[i].getEncodedName());
731        if (!regionLocks[i].tryExclusiveLock(procedure)) {
732          LOG.info("Waiting on xlock for {} held by pid={}", procedure,
733              regionLocks[i].getExclusiveLockProcIdOwner());
734          waitProcedure(regionLocks[i], procedure);
735          hasLock = false;
736          while (i-- > 0) {
737            regionLocks[i].releaseExclusiveLock(procedure);
738          }
739          break;
740        } else {
741          LOG.info("Took xlock for {}", procedure);
742        }
743      }
744
745      if (!hasLock) {
746        wakeTableSharedLock(procedure, table);
747      }
748      return !hasLock;
749    } finally {
750      schedUnlock();
751    }
752  }
753
754  /**
755   * Wake the procedures waiting for the specified region
756   * @param procedure the procedure that was holding the region
757   * @param regionInfo the region the procedure was holding
758   */
759  public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
760    wakeRegions(procedure, regionInfo.getTable(), regionInfo);
761  }
762
763  /**
764   * Wake the procedures waiting for the specified regions
765   * @param procedure the procedure that was holding the regions
766   * @param regionInfo the list of regions the procedure was holding
767   */
768  public void wakeRegions(final Procedure<?> procedure,final TableName table,
769      final RegionInfo... regionInfo) {
770    Arrays.sort(regionInfo, RegionInfo.COMPARATOR);
771    schedLock();
772    try {
773      int numProcs = 0;
774      final Procedure<?>[] nextProcs = new Procedure[regionInfo.length];
775      for (int i = 0; i < regionInfo.length; ++i) {
776        assert regionInfo[i].getTable().equals(table);
777        assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i];
778
779        LockAndQueue regionLock = locking.getRegionLock(regionInfo[i].getEncodedName());
780        if (regionLock.releaseExclusiveLock(procedure)) {
781          if (!regionLock.isWaitingQueueEmpty()) {
782            // release one procedure at the time since regions has an xlock
783            nextProcs[numProcs++] = regionLock.removeFirst();
784          } else {
785            locking.removeRegionLock(regionInfo[i].getEncodedName());
786          }
787        }
788      }
789
790      // awake procedures if any
791      for (int i = numProcs - 1; i >= 0; --i) {
792        wakeProcedure(nextProcs[i]);
793      }
794      wakePollIfNeeded(numProcs);
795      // release the table shared-lock.
796      wakeTableSharedLock(procedure, table);
797    } finally {
798      schedUnlock();
799    }
800  }
801
802  // ============================================================================
803  //  Namespace Locking Helpers
804  // ============================================================================
805  /**
806   * Suspend the procedure if the specified namespace is already locked.
807   * @see #wakeNamespaceExclusiveLock(Procedure,String)
808   * @param procedure the procedure trying to acquire the lock
809   * @param namespace Namespace to lock
810   * @return true if the procedure has to wait for the namespace to be available
811   */
812  public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) {
813    schedLock();
814    try {
815      final LockAndQueue systemNamespaceTableLock =
816        locking.getTableLock(TableName.NAMESPACE_TABLE_NAME);
817      if (!systemNamespaceTableLock.trySharedLock(procedure)) {
818        waitProcedure(systemNamespaceTableLock, procedure);
819        logLockedResource(LockedResourceType.TABLE,
820          TableName.NAMESPACE_TABLE_NAME.getNameAsString());
821        return true;
822      }
823
824      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
825      if (!namespaceLock.tryExclusiveLock(procedure)) {
826        systemNamespaceTableLock.releaseSharedLock();
827        waitProcedure(namespaceLock, procedure);
828        logLockedResource(LockedResourceType.NAMESPACE, namespace);
829        return true;
830      }
831      return false;
832    } finally {
833      schedUnlock();
834    }
835  }
836
837  /**
838   * Wake the procedures waiting for the specified namespace
839   * @see #waitNamespaceExclusiveLock(Procedure,String)
840   * @param procedure the procedure releasing the lock
841   * @param namespace the namespace that has the exclusive lock
842   */
843  public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) {
844    schedLock();
845    try {
846      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
847      final LockAndQueue systemNamespaceTableLock =
848          locking.getTableLock(TableName.NAMESPACE_TABLE_NAME);
849      int waitingCount = 0;
850      if (namespaceLock.releaseExclusiveLock(procedure)) {
851        waitingCount += wakeWaitingProcedures(namespaceLock);
852      }
853      if (systemNamespaceTableLock.releaseSharedLock()) {
854        addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME),
855          () -> procedure + " released namespace exclusive lock");
856        waitingCount += wakeWaitingProcedures(systemNamespaceTableLock);
857      }
858      wakePollIfNeeded(waitingCount);
859    } finally {
860      schedUnlock();
861    }
862  }
863
864  // ============================================================================
865  //  Server Locking Helpers
866  // ============================================================================
867  /**
868   * Try to acquire the exclusive lock on the specified server.
869   * @see #wakeServerExclusiveLock(Procedure,ServerName)
870   * @param procedure the procedure trying to acquire the lock
871   * @param serverName Server to lock
872   * @return true if the procedure has to wait for the server to be available
873   */
874  public boolean waitServerExclusiveLock(final Procedure<?> procedure,
875      final ServerName serverName) {
876    schedLock();
877    try {
878      final LockAndQueue lock = locking.getServerLock(serverName);
879      if (lock.tryExclusiveLock(procedure)) {
880        // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
881        // so.
882        removeFromRunQueue(serverRunQueue,
883          getServerQueue(serverName,
884            procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
885              : null),
886          () -> procedure + " held exclusive lock");
887        return false;
888      }
889      waitProcedure(lock, procedure);
890      logLockedResource(LockedResourceType.SERVER, serverName.getServerName());
891      return true;
892    } finally {
893      schedUnlock();
894    }
895  }
896
897  /**
898   * Wake the procedures waiting for the specified server
899   * @see #waitServerExclusiveLock(Procedure,ServerName)
900   * @param procedure the procedure releasing the lock
901   * @param serverName the server that has the exclusive lock
902   */
903  public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) {
904    schedLock();
905    try {
906      final LockAndQueue lock = locking.getServerLock(serverName);
907      // Only SCP will acquire/release server lock so do not need to check the return value here.
908      lock.releaseExclusiveLock(procedure);
909      // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
910      // so.
911      addToRunQueue(serverRunQueue,
912        getServerQueue(serverName,
913          procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
914            : null), () -> procedure + " released exclusive lock");
915      int waitingCount = wakeWaitingProcedures(lock);
916      wakePollIfNeeded(waitingCount);
917    } finally {
918      schedUnlock();
919    }
920  }
921
922  // ============================================================================
923  //  Peer Locking Helpers
924  // ============================================================================
925  private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) {
926    return proc.getPeerOperationType() != PeerOperationType.REFRESH;
927  }
928
929  /**
930   * Try to acquire the exclusive lock on the specified peer.
931   * @see #wakePeerExclusiveLock(Procedure, String)
932   * @param procedure the procedure trying to acquire the lock
933   * @param peerId peer to lock
934   * @return true if the procedure has to wait for the peer to be available
935   */
936  public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) {
937    schedLock();
938    try {
939      final LockAndQueue lock = locking.getPeerLock(peerId);
940      if (lock.tryExclusiveLock(procedure)) {
941        removeFromRunQueue(peerRunQueue, getPeerQueue(peerId),
942          () -> procedure + " held exclusive lock");
943        return false;
944      }
945      waitProcedure(lock, procedure);
946      logLockedResource(LockedResourceType.PEER, peerId);
947      return true;
948    } finally {
949      schedUnlock();
950    }
951  }
952
953  /**
954   * Wake the procedures waiting for the specified peer
955   * @see #waitPeerExclusiveLock(Procedure, String)
956   * @param procedure the procedure releasing the lock
957   * @param peerId the peer that has the exclusive lock
958   */
959  public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) {
960    schedLock();
961    try {
962      final LockAndQueue lock = locking.getPeerLock(peerId);
963      if (lock.releaseExclusiveLock(procedure)) {
964        addToRunQueue(peerRunQueue, getPeerQueue(peerId),
965          () -> procedure + " released exclusive lock");
966        int waitingCount = wakeWaitingProcedures(lock);
967        wakePollIfNeeded(waitingCount);
968      }
969    } finally {
970      schedUnlock();
971    }
972  }
973
974  // ============================================================================
975  // Meta Locking Helpers
976  // ============================================================================
977  /**
978   * Try to acquire the exclusive lock on meta.
979   * @see #wakeMetaExclusiveLock(Procedure)
980   * @param procedure the procedure trying to acquire the lock
981   * @return true if the procedure has to wait for meta to be available
982   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
983   *             {@link RecoverMetaProcedure}.
984   */
985  @Deprecated
986  public boolean waitMetaExclusiveLock(Procedure<?> procedure) {
987    schedLock();
988    try {
989      final LockAndQueue lock = locking.getMetaLock();
990      if (lock.tryExclusiveLock(procedure)) {
991        removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock");
992        return false;
993      }
994      waitProcedure(lock, procedure);
995      logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString());
996      return true;
997    } finally {
998      schedUnlock();
999    }
1000  }
1001
1002  /**
1003   * Wake the procedures waiting for meta.
1004   * @see #waitMetaExclusiveLock(Procedure)
1005   * @param procedure the procedure releasing the lock
1006   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
1007   *             {@link RecoverMetaProcedure}.
1008   */
1009  @Deprecated
1010  public void wakeMetaExclusiveLock(Procedure<?> procedure) {
1011    schedLock();
1012    try {
1013      final LockAndQueue lock = locking.getMetaLock();
1014      lock.releaseExclusiveLock(procedure);
1015      addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock");
1016      int waitingCount = wakeWaitingProcedures(lock);
1017      wakePollIfNeeded(waitingCount);
1018    } finally {
1019      schedUnlock();
1020    }
1021  }
1022
1023  /**
1024   * For debugging. Expensive.
1025   */
1026  @VisibleForTesting
1027  public String dumpLocks() throws IOException {
1028    schedLock();
1029    try {
1030      // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter
1031      return this.locking.toString();
1032    } finally {
1033      schedUnlock();
1034    }
1035  }
1036}