001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Optional;
027import java.util.function.Consumer;
028import java.util.function.Function;
029import java.util.function.Supplier;
030import org.apache.commons.lang3.builder.ToStringBuilder;
031import org.apache.commons.lang3.builder.ToStringStyle;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.TableExistsException;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.TableNotFoundException;
037import org.apache.hadoop.hbase.client.RegionInfo;
038import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
039import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
040import org.apache.hadoop.hbase.procedure2.LockAndQueue;
041import org.apache.hadoop.hbase.procedure2.LockStatus;
042import org.apache.hadoop.hbase.procedure2.LockedResource;
043import org.apache.hadoop.hbase.procedure2.LockedResourceType;
044import org.apache.hadoop.hbase.procedure2.Procedure;
045import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
046import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
047import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
048import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * ProcedureScheduler for the Master Procedures. This ProcedureScheduler tries to provide to the
055 * ProcedureExecutor procedures that can be executed without having to wait on a lock. Most of the
056 * master operations can be executed concurrently, if they are operating on different tables (e.g.
057 * two create table procedures can be performed at the same time) or against two different servers;
058 * say two servers that crashed at about the same time.
059 * <p>
060 * Each procedure should implement an Interface providing information for this queue. For example
061 * table related procedures should implement TableProcedureInterface. Each procedure will be pushed
062 * in its own queue, and based on the operation type we may make smarter decisions: e.g. we can
063 * abort all the operations preceding a delete table, or similar.
064 * <h4>Concurrency control</h4> Concurrent access to member variables (tableRunQueue,
065 * serverRunQueue, locking, tableMap, serverBuckets) is controlled by schedLock(). This mainly
066 * includes:<br>
067 * <ul>
068 * <li>{@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue when:
069 * <ol>
070 * <li>Queue was empty before push (so must have been out of run-queue)</li>
071 * <li>Child procedure is added (which means parent procedure holds exclusive lock, and it must have
072 * moved Queue out of run-queue)</li>
073 * </ol>
074 * </li>
075 * <li>{@link #poll(long)}: A poll will remove a Queue from run-queue when:
076 * <ol>
077 * <li>Queue becomes empty after poll</li>
078 * <li>Exclusive lock is requested by polled procedure and lock is available (returns the
079 * procedure)</li>
080 * <li>Exclusive lock is requested but lock is not available (returns null)</li>
081 * <li>Polled procedure is child of parent holding exclusive lock and the next procedure is not a
082 * child</li>
083 * </ol>
084 * </li>
085 * <li>Namespace/table/region locks: Queue is added back to run-queue when lock being released is:
086 * <ol>
087 * <li>Exclusive lock</li>
088 * <li>Last shared lock (in case queue was removed because next procedure in queue required
089 * exclusive lock)</li>
090 * </ol>
091 * </li>
092 * </ul>
093 */
094@InterfaceAudience.Private
095public class MasterProcedureScheduler extends AbstractProcedureScheduler {
096  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class);
097
098  private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR =
099    (n, k) -> n.compareKey((ServerName) k);
100  private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR =
101    (n, k) -> n.compareKey((TableName) k);
102  private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR =
103    (n, k) -> n.compareKey((String) k);
104  private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
105    (n, k) -> n.compareKey((TableName) k);
106  private final static AvlKeyComparator<GlobalQueue> GLOBAL_QUEUE_KEY_COMPARATOR =
107    (n, k) -> n.compareKey((String) k);
108
109  private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
110  private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
111  private final FairQueue<String> peerRunQueue = new FairQueue<>();
112  private final FairQueue<TableName> metaRunQueue = new FairQueue<>();
113  private final FairQueue<String> globalRunQueue = new FairQueue<>();
114
115  private final ServerQueue[] serverBuckets = new ServerQueue[128];
116  private TableQueue tableMap = null;
117  private PeerQueue peerMap = null;
118  private MetaQueue metaMap = null;
119  private GlobalQueue globalMap = null;
120
121  private final Function<Long, Procedure<?>> procedureRetriever;
122  private final SchemaLocking locking;
123
124  // To prevent multiple Create/Modify/Disable/Enable table procedure run at the same time, we will
125  // keep table procedure in this queue first before actually enqueuing it to tableQueue
126  // Seee HBASE-28683 for more details
127  private final Map<TableName, TableProcedureWaitingQueue> tableProcsWaitingEnqueue =
128    new HashMap<>();
129
130  public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) {
131    this.procedureRetriever = procedureRetriever;
132    locking = new SchemaLocking(procedureRetriever);
133  }
134
135  @Override
136  public void yield(final Procedure proc) {
137    push(proc, false, true);
138  }
139
140  private boolean shouldWaitBeforeEnqueuing(TableProcedureInterface proc) {
141    return TableQueue.requireTableExclusiveLock(proc);
142  }
143
144  @Override
145  protected void enqueue(final Procedure proc, final boolean addFront) {
146    if (isMetaProcedure(proc)) {
147      doAdd(metaRunQueue, getMetaQueue(), proc, addFront);
148    } else if (isTableProcedure(proc)) {
149      TableProcedureInterface tableProc = (TableProcedureInterface) proc;
150      if (shouldWaitBeforeEnqueuing(tableProc)) {
151        TableProcedureWaitingQueue waitingQueue = tableProcsWaitingEnqueue.computeIfAbsent(
152          tableProc.getTableName(), k -> new TableProcedureWaitingQueue(procedureRetriever));
153        if (!waitingQueue.procedureSubmitted(proc)) {
154          // there is a table procedure for this table already enqueued, waiting
155          LOG.debug("There is already a procedure running for table {}, added {} to waiting queue",
156            tableProc.getTableName(), proc);
157          return;
158        }
159      }
160      doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
161    } else if (isServerProcedure(proc)) {
162      ServerProcedureInterface spi = (ServerProcedureInterface) proc;
163      doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
164    } else if (isPeerProcedure(proc)) {
165      doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
166    } else if (isGlobalProcedure(proc)) {
167      doAdd(globalRunQueue, getGlobalQueue(getGlobalId(proc)), proc, addFront);
168    } else {
169      // TODO: at the moment we only have Table and Server procedures
170      // if you are implementing a non-table/non-server procedure, you have two options: create
171      // a group for all the non-table/non-server procedures or try to find a key for your
172      // non-table/non-server procedures and implement something similar to the TableRunQueue.
173      throw new UnsupportedOperationException(
174        "RQs for non-table/non-server procedures are not implemented yet: " + proc);
175    }
176  }
177
178  private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,
179    Procedure<?> proc, boolean addFront) {
180    queue.add(proc, addFront);
181    // For the following conditions, we will put the queue back into execution
182    // 1. The procedure has already held the lock, or the lock has been restored when restarting,
183    // which means it can be executed immediately.
184    // 2. The exclusive lock for this queue has not been held.
185    // 3. The given procedure has the exclusive lock permission for this queue.
186    Supplier<String> reason = null;
187    if (proc.hasLock()) {
188      reason = () -> proc + " has lock";
189    } else if (proc.isLockedWhenLoading()) {
190      reason = () -> proc + " restores lock when restarting";
191    } else if (!queue.getLockStatus().hasExclusiveLock()) {
192      reason = () -> "the exclusive lock is not held by anyone when adding " + proc;
193    } else if (queue.getLockStatus().hasLockAccess(proc)) {
194      reason = () -> proc + " has the excusive lock access";
195    }
196    if (reason != null) {
197      addToRunQueue(fairq, queue, reason);
198    }
199  }
200
201  @Override
202  protected boolean queueHasRunnables() {
203    return globalRunQueue.hasRunnables() || metaRunQueue.hasRunnables()
204      || tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables()
205      || peerRunQueue.hasRunnables();
206  }
207
208  @Override
209  protected Procedure dequeue() {
210    // pull global first
211    Procedure<?> pollResult = doPoll(globalRunQueue);
212    // then meta procedure
213    if (pollResult == null) {
214      pollResult = doPoll(metaRunQueue);
215    }
216    // For now, let server handling have precedence over table handling; presumption is that it
217    // is more important handling crashed servers than it is running the
218    // enabling/disabling tables, etc.
219    if (pollResult == null) {
220      pollResult = doPoll(serverRunQueue);
221    }
222    if (pollResult == null) {
223      pollResult = doPoll(peerRunQueue);
224    }
225    if (pollResult == null) {
226      pollResult = doPoll(tableRunQueue);
227    }
228    return pollResult;
229  }
230
231  private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) {
232    LockStatus s = rq.getLockStatus();
233    // if we have the lock access, we are ready
234    if (s.hasLockAccess(proc)) {
235      return true;
236    }
237    boolean xlockReq = rq.requireExclusiveLock(proc);
238    // if we need to hold the xlock, then we need to make sure that no one holds any lock, including
239    // the shared lock, otherwise, we just need to make sure that no one holds the xlock
240    return xlockReq ? !s.isLocked() : !s.hasExclusiveLock();
241  }
242
243  private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) {
244    Queue<T> rq = fairq.poll();
245    if (rq == null || !rq.isAvailable()) {
246      return null;
247    }
248    // loop until we find out a procedure which is ready to run, or if we have checked all the
249    // procedures, then we give up and remove the queue from run queue.
250    for (int i = 0, n = rq.size(); i < n; i++) {
251      Procedure<?> proc = rq.poll();
252      if (isLockReady(proc, rq)) {
253        // the queue is empty, remove from run queue
254        if (rq.isEmpty()) {
255          removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc);
256        }
257        return proc;
258      }
259      // we are not ready to run, add back and try the next procedure
260      rq.add(proc, false);
261    }
262    // no procedure is ready for execution, remove from run queue
263    removeFromRunQueue(fairq, rq, () -> "no procedure can be executed");
264    return null;
265  }
266
267  @Override
268  public List<LockedResource> getLocks() {
269    schedLock();
270    try {
271      return locking.getLocks();
272    } finally {
273      schedUnlock();
274    }
275  }
276
277  @Override
278  public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
279    schedLock();
280    try {
281      return locking.getLockResource(resourceType, resourceName);
282    } finally {
283      schedUnlock();
284    }
285  }
286
287  @Override
288  public void clear() {
289    schedLock();
290    try {
291      clearQueue();
292      locking.clear();
293    } finally {
294      schedUnlock();
295    }
296  }
297
298  private void clearQueue() {
299    // Remove Servers
300    for (int i = 0; i < serverBuckets.length; ++i) {
301      clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
302      serverBuckets[i] = null;
303    }
304
305    // Remove Tables
306    clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
307    tableMap = null;
308    tableProcsWaitingEnqueue.clear();
309
310    // Remove Peers
311    clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR);
312    peerMap = null;
313
314    // Remove Meta
315    clear(metaMap, metaRunQueue, META_QUEUE_KEY_COMPARATOR);
316    metaMap = null;
317
318    // Remove Global
319    clear(globalMap, globalRunQueue, GLOBAL_QUEUE_KEY_COMPARATOR);
320    globalMap = null;
321
322    assert size() == 0 : "expected queue size to be 0, got " + size();
323  }
324
325  private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap,
326    FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) {
327    while (treeMap != null) {
328      Queue<T> node = AvlTree.getFirst(treeMap);
329      treeMap = AvlTree.remove(treeMap, node.getKey(), comparator);
330      if (fairq != null) {
331        removeFromRunQueue(fairq, node, () -> "clear all queues");
332      }
333    }
334  }
335
336  private int queueSize(Queue<?> head) {
337    int count = 0;
338    AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head);
339    while (iter.hasNext()) {
340      count += iter.next().size();
341    }
342    return count;
343  }
344
345  @Override
346  protected int queueSize() {
347    int count = 0;
348    for (ServerQueue serverMap : serverBuckets) {
349      count += queueSize(serverMap);
350    }
351    count += queueSize(tableMap);
352    count += queueSize(peerMap);
353    count += queueSize(metaMap);
354    count += queueSize(globalMap);
355    for (TableProcedureWaitingQueue waitingQ : tableProcsWaitingEnqueue.values()) {
356      count += waitingQ.waitingSize();
357    }
358    return count;
359  }
360
361  @Override
362  public void completionCleanup(final Procedure proc) {
363    if (isTableProcedure(proc)) {
364      TableProcedureInterface tableProc = (TableProcedureInterface) proc;
365      if (shouldWaitBeforeEnqueuing(tableProc)) {
366        schedLock();
367        try {
368          TableProcedureWaitingQueue waitingQueue =
369            tableProcsWaitingEnqueue.get(tableProc.getTableName());
370          if (waitingQueue != null) {
371            Optional<Procedure<?>> nextProc = waitingQueue.procedureCompleted(proc);
372            if (nextProc.isPresent()) {
373              // enqueue it
374              Procedure<?> next = nextProc.get();
375              LOG.debug("{} completed, enqueue a new procedure {}", proc, next);
376              doAdd(tableRunQueue, getTableQueue(tableProc.getTableName()), next, false);
377            } else {
378              if (waitingQueue.isEmpty()) {
379                // there is no waiting procedures in it, remove
380                tableProcsWaitingEnqueue.remove(tableProc.getTableName());
381              }
382            }
383          } else {
384            // this should not happen normally, warn it
385            LOG.warn("no waiting queue while completing {}, which should not happen", proc);
386          }
387        } finally {
388          schedUnlock();
389        }
390      }
391      boolean tableDeleted;
392      if (proc.hasException()) {
393        Exception procEx = proc.getException().unwrapRemoteException();
394        if (tableProc.getTableOperationType() == TableOperationType.CREATE) {
395          // create failed because the table already exist
396          tableDeleted = !(procEx instanceof TableExistsException);
397        } else {
398          // the operation failed because the table does not exist
399          tableDeleted = (procEx instanceof TableNotFoundException);
400        }
401      } else {
402        // the table was deleted
403        tableDeleted = (tableProc.getTableOperationType() == TableOperationType.DELETE);
404      }
405      if (tableDeleted) {
406        markTableAsDeleted(tableProc.getTableName(), proc);
407      }
408    } else if (proc instanceof PeerProcedureInterface) {
409      tryCleanupPeerQueue(getPeerId(proc), proc);
410    } else if (proc instanceof ServerProcedureInterface) {
411      tryCleanupServerQueue(getServerName(proc), proc);
412    } else if (proc instanceof GlobalProcedureInterface) {
413      tryCleanupGlobalQueue(getGlobalId(proc), proc);
414    } else {
415      // No cleanup for other procedure types, yet.
416      return;
417    }
418  }
419
420  private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue,
421    Supplier<String> reason) {
422    if (LOG.isTraceEnabled()) {
423      LOG.trace("Add {} to run queue because: {}", queue, reason.get());
424    }
425    if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) {
426      fairq.add(queue);
427    }
428  }
429
430  private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq,
431    Queue<T> queue, Supplier<String> reason) {
432    if (LOG.isTraceEnabled()) {
433      LOG.trace("Remove {} from run queue because: {}", queue, reason.get());
434    }
435    if (AvlIterableList.isLinked(queue)) {
436      fairq.remove(queue);
437    }
438  }
439
440  // ============================================================================
441  // Table Queue Lookup Helpers
442  // ============================================================================
443  private TableQueue getTableQueue(TableName tableName) {
444    TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
445    if (node != null) {
446      return node;
447    }
448
449    node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName),
450      locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString()));
451    tableMap = AvlTree.insert(tableMap, node);
452    return node;
453  }
454
455  private void removeTableQueue(TableName tableName) {
456    tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
457    locking.removeTableLock(tableName);
458  }
459
460  /**
461   * Tries to remove the queue and the table-lock of the specified table. If there are new
462   * operations pending (e.g. a new create), the remove will not be performed.
463   * @param table     the name of the table that should be marked as deleted
464   * @param procedure the procedure that is removing the table
465   * @return true if deletion succeeded, false otherwise meaning that there are other new operations
466   *         pending for that table (e.g. a new create).
467   */
468  @RestrictedApi(explanation = "Should only be called in tests", link = "",
469      allowedOnPath = ".*/(MasterProcedureScheduler.java|src/test/.*)")
470  boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) {
471    return tryCleanupQueue(table, procedure, () -> tableMap, TABLE_QUEUE_KEY_COMPARATOR,
472      locking::getTableLock, tableRunQueue, this::removeTableQueue);
473  }
474
475  private static boolean isTableProcedure(Procedure<?> proc) {
476    return proc instanceof TableProcedureInterface;
477  }
478
479  private static TableName getTableName(Procedure<?> proc) {
480    return ((TableProcedureInterface) proc).getTableName();
481  }
482
483  // ============================================================================
484  // Server Queue Lookup Helpers
485  // ============================================================================
486  private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) {
487    final int index = getBucketIndex(serverBuckets, serverName.hashCode());
488    ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
489    if (node != null) {
490      return node;
491    }
492    int priority;
493    if (proc != null) {
494      priority = MasterProcedureUtil.getServerPriority(proc);
495    } else {
496      priority = 1;
497    }
498    node = new ServerQueue(serverName, priority, locking.getServerLock(serverName));
499    serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
500    return node;
501  }
502
503  private void removeServerQueue(ServerName serverName) {
504    int index = getBucketIndex(serverBuckets, serverName.hashCode());
505    serverBuckets[index] =
506      AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
507    locking.removeServerLock(serverName);
508  }
509
510  private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
511    // serverBuckets
512    tryCleanupQueue(serverName, proc,
513      () -> serverBuckets[getBucketIndex(serverBuckets, serverName.hashCode())],
514      SERVER_QUEUE_KEY_COMPARATOR, locking::getServerLock, serverRunQueue, this::removeServerQueue);
515  }
516
517  private static int getBucketIndex(Object[] buckets, int hashCode) {
518    return Math.abs(hashCode) % buckets.length;
519  }
520
521  private static boolean isServerProcedure(Procedure<?> proc) {
522    return proc instanceof ServerProcedureInterface;
523  }
524
525  private static ServerName getServerName(Procedure<?> proc) {
526    return ((ServerProcedureInterface) proc).getServerName();
527  }
528
529  // ============================================================================
530  // Peer Queue Lookup Helpers
531  // ============================================================================
532  private PeerQueue getPeerQueue(String peerId) {
533    PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
534    if (node != null) {
535      return node;
536    }
537    node = new PeerQueue(peerId, locking.getPeerLock(peerId));
538    peerMap = AvlTree.insert(peerMap, node);
539    return node;
540  }
541
542  private void removePeerQueue(String peerId) {
543    peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
544    locking.removePeerLock(peerId);
545  }
546
547  private void tryCleanupPeerQueue(String peerId, Procedure<?> procedure) {
548    tryCleanupQueue(peerId, procedure, () -> peerMap, PEER_QUEUE_KEY_COMPARATOR,
549      locking::getPeerLock, peerRunQueue, this::removePeerQueue);
550  }
551
552  private static boolean isPeerProcedure(Procedure<?> proc) {
553    return proc instanceof PeerProcedureInterface;
554  }
555
556  private static String getPeerId(Procedure<?> proc) {
557    return ((PeerProcedureInterface) proc).getPeerId();
558  }
559
560  // ============================================================================
561  // Meta Queue Lookup Helpers
562  // ============================================================================
563  private MetaQueue getMetaQueue() {
564    MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR);
565    if (node != null) {
566      return node;
567    }
568    node = new MetaQueue(locking.getMetaLock());
569    metaMap = AvlTree.insert(metaMap, node);
570    return node;
571  }
572
573  private static boolean isMetaProcedure(Procedure<?> proc) {
574    return proc instanceof MetaProcedureInterface;
575  }
576
577  // ============================================================================
578  // Global Queue Lookup Helpers
579  // ============================================================================
580  private GlobalQueue getGlobalQueue(String globalId) {
581    GlobalQueue node = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
582    if (node != null) {
583      return node;
584    }
585    node = new GlobalQueue(globalId, locking.getGlobalLock(globalId));
586    globalMap = AvlTree.insert(globalMap, node);
587    return node;
588  }
589
590  private void removeGlobalQueue(String globalId) {
591    globalMap = AvlTree.remove(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
592    locking.removeGlobalLock(globalId);
593  }
594
595  private void tryCleanupGlobalQueue(String globalId, Procedure<?> procedure) {
596    tryCleanupQueue(globalId, procedure, () -> globalMap, GLOBAL_QUEUE_KEY_COMPARATOR,
597      locking::getGlobalLock, globalRunQueue, this::removeGlobalQueue);
598  }
599
600  private static boolean isGlobalProcedure(Procedure<?> proc) {
601    return proc instanceof GlobalProcedureInterface;
602  }
603
604  private static String getGlobalId(Procedure<?> proc) {
605    return ((GlobalProcedureInterface) proc).getGlobalId();
606  }
607
608  private <T extends Comparable<T>, TNode extends Queue<T>> boolean tryCleanupQueue(T id,
609    Procedure<?> proc, Supplier<TNode> getMap, AvlKeyComparator<TNode> comparator,
610    Function<T, LockAndQueue> getLock, FairQueue<T> runQueue, Consumer<T> removeQueue) {
611    schedLock();
612    try {
613      Queue<T> queue = AvlTree.get(getMap.get(), id, comparator);
614      if (queue == null) {
615        return true;
616      }
617
618      LockAndQueue lock = getLock.apply(id);
619      if (queue.isEmpty() && lock.isWaitingQueueEmpty() && !lock.isLocked()) {
620        // 1. the queue is empty
621        // 2. no procedure is in the lock's waiting queue
622        // 3. no other one holds the lock. It is possible that someone else holds the lock, usually
623        // our parent procedures
624        // If we can meet all the above conditions, it is safe for us to remove the queue
625        removeFromRunQueue(runQueue, queue, () -> "clean up queue after " + proc + " completed");
626        removeQueue.accept(id);
627        return true;
628      } else {
629        return false;
630      }
631
632    } finally {
633      schedUnlock();
634    }
635  }
636
637  // ============================================================================
638  // Table Locking Helpers
639  // ============================================================================
640  /**
641   * Get lock info for a resource of specified type and name and log details
642   */
643  private void logLockedResource(LockedResourceType resourceType, String resourceName) {
644    if (!LOG.isDebugEnabled()) {
645      return;
646    }
647
648    LockedResource lockedResource = getLockResource(resourceType, resourceName);
649    if (lockedResource != null) {
650      String msg = resourceType.toString() + " '" + resourceName + "', shared lock count="
651        + lockedResource.getSharedLockCount();
652
653      Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure();
654      if (proc != null) {
655        msg += ", exclusively locked by procId=" + proc.getProcId();
656      }
657      LOG.debug(msg);
658    }
659  }
660
661  /**
662   * Suspend the procedure if the specified table is already locked. Other operations in the
663   * table-queue will be executed after the lock is released.
664   * @param procedure the procedure trying to acquire the lock
665   * @param table     Table to lock
666   * @return true if the procedure has to wait for the table to be available
667   */
668  public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
669    schedLock();
670    try {
671      final String namespace = table.getNamespaceAsString();
672      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
673      final LockAndQueue tableLock = locking.getTableLock(table);
674      if (!namespaceLock.trySharedLock(procedure)) {
675        waitProcedure(namespaceLock, procedure);
676        logLockedResource(LockedResourceType.NAMESPACE, namespace);
677        return true;
678      }
679      if (!tableLock.tryExclusiveLock(procedure)) {
680        namespaceLock.releaseSharedLock();
681        waitProcedure(tableLock, procedure);
682        logLockedResource(LockedResourceType.TABLE, table.getNameAsString());
683        return true;
684      }
685      removeFromRunQueue(tableRunQueue, getTableQueue(table),
686        () -> procedure + " held the exclusive lock");
687      return false;
688    } finally {
689      schedUnlock();
690    }
691  }
692
693  /**
694   * Wake the procedures waiting for the specified table
695   * @param procedure the procedure releasing the lock
696   * @param table     the name of the table that has the exclusive lock
697   */
698  public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
699    schedLock();
700    try {
701      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
702      final LockAndQueue tableLock = locking.getTableLock(table);
703      int waitingCount = 0;
704      if (tableLock.releaseExclusiveLock(procedure)) {
705        waitingCount += wakeWaitingProcedures(tableLock);
706      }
707      if (namespaceLock.releaseSharedLock()) {
708        waitingCount += wakeWaitingProcedures(namespaceLock);
709      }
710      addToRunQueue(tableRunQueue, getTableQueue(table),
711        () -> procedure + " released the exclusive lock");
712      wakePollIfNeeded(waitingCount);
713    } finally {
714      schedUnlock();
715    }
716  }
717
718  /**
719   * Suspend the procedure if the specified table is already locked. other "read" operations in the
720   * table-queue may be executed concurrently,
721   * @param procedure the procedure trying to acquire the lock
722   * @param table     Table to lock
723   * @return true if the procedure has to wait for the table to be available
724   */
725  public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) {
726    return waitTableQueueSharedLock(procedure, table) == null;
727  }
728
729  private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) {
730    schedLock();
731    try {
732      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
733      final LockAndQueue tableLock = locking.getTableLock(table);
734      if (!namespaceLock.trySharedLock(procedure)) {
735        waitProcedure(namespaceLock, procedure);
736        return null;
737      }
738
739      if (!tableLock.trySharedLock(procedure)) {
740        namespaceLock.releaseSharedLock();
741        waitProcedure(tableLock, procedure);
742        return null;
743      }
744
745      return getTableQueue(table);
746    } finally {
747      schedUnlock();
748    }
749  }
750
751  /**
752   * Wake the procedures waiting for the specified table
753   * @param procedure the procedure releasing the lock
754   * @param table     the name of the table that has the shared lock
755   */
756  public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) {
757    schedLock();
758    try {
759      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
760      final LockAndQueue tableLock = locking.getTableLock(table);
761      int waitingCount = 0;
762      if (tableLock.releaseSharedLock()) {
763        addToRunQueue(tableRunQueue, getTableQueue(table),
764          () -> procedure + " released the shared lock");
765        waitingCount += wakeWaitingProcedures(tableLock);
766      }
767      if (namespaceLock.releaseSharedLock()) {
768        waitingCount += wakeWaitingProcedures(namespaceLock);
769      }
770      wakePollIfNeeded(waitingCount);
771    } finally {
772      schedUnlock();
773    }
774  }
775
776  // ============================================================================
777  // Region Locking Helpers
778  // ============================================================================
779  /**
780   * Suspend the procedure if the specified region is already locked.
781   * @param procedure  the procedure trying to acquire the lock on the region
782   * @param regionInfo the region we are trying to lock
783   * @return true if the procedure has to wait for the regions to be available
784   */
785  public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
786    return waitRegions(procedure, regionInfo.getTable(), regionInfo);
787  }
788
789  /**
790   * Suspend the procedure if the specified set of regions are already locked.
791   * @param procedure   the procedure trying to acquire the lock on the regions
792   * @param table       the table name of the regions we are trying to lock
793   * @param regionInfos the list of regions we are trying to lock
794   * @return true if the procedure has to wait for the regions to be available
795   */
796  public boolean waitRegions(final Procedure<?> procedure, final TableName table,
797    final RegionInfo... regionInfos) {
798    Arrays.sort(regionInfos, RegionInfo.COMPARATOR);
799    schedLock();
800    try {
801      assert table != null;
802      if (waitTableSharedLock(procedure, table)) {
803        return true;
804      }
805
806      // acquire region xlocks or wait
807      boolean hasLock = true;
808      final LockAndQueue[] regionLocks = new LockAndQueue[regionInfos.length];
809      for (int i = 0; i < regionInfos.length; ++i) {
810        assert regionInfos[i] != null;
811        assert regionInfos[i].getTable() != null;
812        assert regionInfos[i].getTable().equals(table) : regionInfos[i] + " " + procedure;
813        assert i == 0 || regionInfos[i] != regionInfos[i - 1]
814          : "duplicate region: " + regionInfos[i];
815
816        regionLocks[i] = locking.getRegionLock(regionInfos[i].getEncodedName());
817        if (!regionLocks[i].tryExclusiveLock(procedure)) {
818          LOG.info("Waiting on xlock for {} held by pid={}", procedure,
819            regionLocks[i].getExclusiveLockProcIdOwner());
820          waitProcedure(regionLocks[i], procedure);
821          hasLock = false;
822          while (i-- > 0) {
823            regionLocks[i].releaseExclusiveLock(procedure);
824          }
825          break;
826        } else {
827          LOG.info("Took xlock for {}", procedure);
828        }
829      }
830
831      if (!hasLock) {
832        wakeTableSharedLock(procedure, table);
833      }
834      return !hasLock;
835    } finally {
836      schedUnlock();
837    }
838  }
839
840  /**
841   * Wake the procedures waiting for the specified region
842   * @param procedure  the procedure that was holding the region
843   * @param regionInfo the region the procedure was holding
844   */
845  public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
846    wakeRegions(procedure, regionInfo.getTable(), regionInfo);
847  }
848
849  /**
850   * Wake the procedures waiting for the specified regions
851   * @param procedure   the procedure that was holding the regions
852   * @param regionInfos the list of regions the procedure was holding
853   */
854  public void wakeRegions(final Procedure<?> procedure, final TableName table,
855    final RegionInfo... regionInfos) {
856    Arrays.sort(regionInfos, RegionInfo.COMPARATOR);
857    schedLock();
858    try {
859      int numProcs = 0;
860      final Procedure<?>[] nextProcs = new Procedure[regionInfos.length];
861      for (int i = 0; i < regionInfos.length; ++i) {
862        assert regionInfos[i].getTable().equals(table);
863        assert i == 0 || regionInfos[i] != regionInfos[i - 1]
864          : "duplicate region: " + regionInfos[i];
865
866        LockAndQueue regionLock = locking.getRegionLock(regionInfos[i].getEncodedName());
867        if (regionLock.releaseExclusiveLock(procedure)) {
868          if (!regionLock.isWaitingQueueEmpty()) {
869            // release one procedure at the time since regions has an xlock
870            nextProcs[numProcs++] = regionLock.removeFirst();
871          } else {
872            locking.removeRegionLock(regionInfos[i].getEncodedName());
873          }
874        }
875      }
876
877      // awake procedures if any
878      for (int i = numProcs - 1; i >= 0; --i) {
879        wakeProcedure(nextProcs[i]);
880      }
881      wakePollIfNeeded(numProcs);
882      // release the table shared-lock.
883      wakeTableSharedLock(procedure, table);
884    } finally {
885      schedUnlock();
886    }
887  }
888
889  // ============================================================================
890  // Namespace Locking Helpers
891  // ============================================================================
892  /**
893   * Suspend the procedure if the specified namespace is already locked.
894   * @see #wakeNamespaceExclusiveLock(Procedure,String)
895   * @param procedure the procedure trying to acquire the lock
896   * @param namespace Namespace to lock
897   * @return true if the procedure has to wait for the namespace to be available
898   */
899  public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) {
900    schedLock();
901    try {
902      final LockAndQueue systemNamespaceTableLock =
903        locking.getTableLock(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME);
904      if (!systemNamespaceTableLock.trySharedLock(procedure)) {
905        waitProcedure(systemNamespaceTableLock, procedure);
906        logLockedResource(LockedResourceType.TABLE,
907          TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME.getNameAsString());
908        return true;
909      }
910
911      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
912      if (!namespaceLock.tryExclusiveLock(procedure)) {
913        systemNamespaceTableLock.releaseSharedLock();
914        waitProcedure(namespaceLock, procedure);
915        logLockedResource(LockedResourceType.NAMESPACE, namespace);
916        return true;
917      }
918      return false;
919    } finally {
920      schedUnlock();
921    }
922  }
923
924  /**
925   * Wake the procedures waiting for the specified namespace
926   * @see #waitNamespaceExclusiveLock(Procedure,String)
927   * @param procedure the procedure releasing the lock
928   * @param namespace the namespace that has the exclusive lock
929   */
930  public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) {
931    schedLock();
932    try {
933      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
934      final LockAndQueue systemNamespaceTableLock =
935        locking.getTableLock(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME);
936      int waitingCount = 0;
937      if (namespaceLock.releaseExclusiveLock(procedure)) {
938        waitingCount += wakeWaitingProcedures(namespaceLock);
939      }
940      if (systemNamespaceTableLock.releaseSharedLock()) {
941        addToRunQueue(tableRunQueue,
942          getTableQueue(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME),
943          () -> procedure + " released namespace exclusive lock");
944        waitingCount += wakeWaitingProcedures(systemNamespaceTableLock);
945      }
946      wakePollIfNeeded(waitingCount);
947    } finally {
948      schedUnlock();
949    }
950  }
951
952  // ============================================================================
953  // Server Locking Helpers
954  // ============================================================================
955  /**
956   * Try to acquire the exclusive lock on the specified server.
957   * @see #wakeServerExclusiveLock(Procedure,ServerName)
958   * @param procedure  the procedure trying to acquire the lock
959   * @param serverName Server to lock
960   * @return true if the procedure has to wait for the server to be available
961   */
962  public boolean waitServerExclusiveLock(final Procedure<?> procedure,
963    final ServerName serverName) {
964    schedLock();
965    try {
966      final LockAndQueue lock = locking.getServerLock(serverName);
967      if (lock.tryExclusiveLock(procedure)) {
968        // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
969        // so.
970        removeFromRunQueue(serverRunQueue,
971          getServerQueue(serverName,
972            procedure instanceof ServerProcedureInterface
973              ? (ServerProcedureInterface) procedure
974              : null),
975          () -> procedure + " held exclusive lock");
976        return false;
977      }
978      waitProcedure(lock, procedure);
979      logLockedResource(LockedResourceType.SERVER, serverName.getServerName());
980      return true;
981    } finally {
982      schedUnlock();
983    }
984  }
985
986  /**
987   * Wake the procedures waiting for the specified server
988   * @see #waitServerExclusiveLock(Procedure,ServerName)
989   * @param procedure  the procedure releasing the lock
990   * @param serverName the server that has the exclusive lock
991   */
992  public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) {
993    schedLock();
994    try {
995      final LockAndQueue lock = locking.getServerLock(serverName);
996      // Only SCP will acquire/release server lock so do not need to check the return value here.
997      lock.releaseExclusiveLock(procedure);
998      // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
999      // so.
1000      addToRunQueue(serverRunQueue,
1001        getServerQueue(serverName,
1002          procedure instanceof ServerProcedureInterface
1003            ? (ServerProcedureInterface) procedure
1004            : null),
1005        () -> procedure + " released exclusive lock");
1006      int waitingCount = wakeWaitingProcedures(lock);
1007      wakePollIfNeeded(waitingCount);
1008    } finally {
1009      schedUnlock();
1010    }
1011  }
1012
1013  // ============================================================================
1014  // Peer Locking Helpers
1015  // ============================================================================
1016  /**
1017   * Try to acquire the exclusive lock on the specified peer.
1018   * @see #wakePeerExclusiveLock(Procedure, String)
1019   * @param procedure the procedure trying to acquire the lock
1020   * @param peerId    peer to lock
1021   * @return true if the procedure has to wait for the peer to be available
1022   */
1023  public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) {
1024    schedLock();
1025    try {
1026      final LockAndQueue lock = locking.getPeerLock(peerId);
1027      if (lock.tryExclusiveLock(procedure)) {
1028        removeFromRunQueue(peerRunQueue, getPeerQueue(peerId),
1029          () -> procedure + " held exclusive lock");
1030        return false;
1031      }
1032      waitProcedure(lock, procedure);
1033      logLockedResource(LockedResourceType.PEER, peerId);
1034      return true;
1035    } finally {
1036      schedUnlock();
1037    }
1038  }
1039
1040  /**
1041   * Wake the procedures waiting for the specified peer
1042   * @see #waitPeerExclusiveLock(Procedure, String)
1043   * @param procedure the procedure releasing the lock
1044   * @param peerId    the peer that has the exclusive lock
1045   */
1046  public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) {
1047    schedLock();
1048    try {
1049      final LockAndQueue lock = locking.getPeerLock(peerId);
1050      if (lock.releaseExclusiveLock(procedure)) {
1051        addToRunQueue(peerRunQueue, getPeerQueue(peerId),
1052          () -> procedure + " released exclusive lock");
1053        int waitingCount = wakeWaitingProcedures(lock);
1054        wakePollIfNeeded(waitingCount);
1055      }
1056    } finally {
1057      schedUnlock();
1058    }
1059  }
1060
1061  // ============================================================================
1062  // Meta Locking Helpers
1063  // ============================================================================
1064  /**
1065   * Try to acquire the exclusive lock on meta.
1066   * @see #wakeMetaExclusiveLock(Procedure)
1067   * @param procedure the procedure trying to acquire the lock
1068   * @return true if the procedure has to wait for meta to be available
1069   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
1070   *             {@link RecoverMetaProcedure}.
1071   */
1072  @Deprecated
1073  public boolean waitMetaExclusiveLock(Procedure<?> procedure) {
1074    schedLock();
1075    try {
1076      final LockAndQueue lock = locking.getMetaLock();
1077      if (lock.tryExclusiveLock(procedure)) {
1078        removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock");
1079        return false;
1080      }
1081      waitProcedure(lock, procedure);
1082      logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString());
1083      return true;
1084    } finally {
1085      schedUnlock();
1086    }
1087  }
1088
1089  /**
1090   * Wake the procedures waiting for meta.
1091   * @see #waitMetaExclusiveLock(Procedure)
1092   * @param procedure the procedure releasing the lock
1093   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
1094   *             {@link RecoverMetaProcedure}.
1095   */
1096  @Deprecated
1097  public void wakeMetaExclusiveLock(Procedure<?> procedure) {
1098    schedLock();
1099    try {
1100      final LockAndQueue lock = locking.getMetaLock();
1101      lock.releaseExclusiveLock(procedure);
1102      addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock");
1103      int waitingCount = wakeWaitingProcedures(lock);
1104      wakePollIfNeeded(waitingCount);
1105    } finally {
1106      schedUnlock();
1107    }
1108  }
1109
1110  // ============================================================================
1111  // Global Locking Helpers
1112  // ============================================================================
1113  /**
1114   * Try to acquire the share lock on global.
1115   * @see #wakeGlobalExclusiveLock(Procedure, String)
1116   * @param procedure the procedure trying to acquire the lock
1117   * @return true if the procedure has to wait for global to be available
1118   */
1119  public boolean waitGlobalExclusiveLock(Procedure<?> procedure, String globalId) {
1120    schedLock();
1121    try {
1122      final LockAndQueue lock = locking.getGlobalLock(globalId);
1123      if (lock.tryExclusiveLock(procedure)) {
1124        removeFromRunQueue(globalRunQueue, getGlobalQueue(globalId),
1125          () -> procedure + " held shared lock");
1126        return false;
1127      }
1128      waitProcedure(lock, procedure);
1129      logLockedResource(LockedResourceType.GLOBAL, HConstants.EMPTY_STRING);
1130      return true;
1131    } finally {
1132      schedUnlock();
1133    }
1134  }
1135
1136  /**
1137   * Wake the procedures waiting for global.
1138   * @see #waitGlobalExclusiveLock(Procedure, String)
1139   * @param procedure the procedure releasing the lock
1140   */
1141  public void wakeGlobalExclusiveLock(Procedure<?> procedure, String globalId) {
1142    schedLock();
1143    try {
1144      final LockAndQueue lock = locking.getGlobalLock(globalId);
1145      lock.releaseExclusiveLock(procedure);
1146      addToRunQueue(globalRunQueue, getGlobalQueue(globalId),
1147        () -> procedure + " released shared lock");
1148      int waitingCount = wakeWaitingProcedures(lock);
1149      wakePollIfNeeded(waitingCount);
1150    } finally {
1151      schedUnlock();
1152    }
1153  }
1154
1155  /**
1156   * For debugging. Expensive.
1157   */
1158  public String dumpLocks() throws IOException {
1159    schedLock();
1160    try {
1161      // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter
1162      return this.locking.toString();
1163    } finally {
1164      schedUnlock();
1165    }
1166  }
1167
1168  private void serverBucketToString(ToStringBuilder builder, String queueName, Queue<?> queue) {
1169    int size = queueSize(queue);
1170    if (size != 0) {
1171      builder.append(queueName, queue);
1172    }
1173  }
1174
1175  @Override
1176  public String toString() {
1177    ToStringBuilder builder =
1178      new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).appendSuper(super.toString());
1179    schedLock();
1180    try {
1181      for (int i = 0; i < serverBuckets.length; i++) {
1182        serverBucketToString(builder, "serverBuckets[" + i + "]", serverBuckets[i]);
1183      }
1184      builder.append("tableMap", tableMap);
1185      builder.append("tableWaitingMap", tableProcsWaitingEnqueue);
1186      builder.append("peerMap", peerMap);
1187      builder.append("metaMap", metaMap);
1188      builder.append("globalMap", globalMap);
1189    } finally {
1190      schedUnlock();
1191    }
1192    return builder.build();
1193  }
1194}