001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.function.Function;
024import java.util.function.Supplier;
025import org.apache.commons.lang3.builder.ToStringBuilder;
026import org.apache.commons.lang3.builder.ToStringStyle;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.ServerName;
029import org.apache.hadoop.hbase.TableExistsException;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.TableNotFoundException;
032import org.apache.hadoop.hbase.client.RegionInfo;
033import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
034import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
035import org.apache.hadoop.hbase.procedure2.LockAndQueue;
036import org.apache.hadoop.hbase.procedure2.LockStatus;
037import org.apache.hadoop.hbase.procedure2.LockedResource;
038import org.apache.hadoop.hbase.procedure2.LockedResourceType;
039import org.apache.hadoop.hbase.procedure2.Procedure;
040import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
041import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
042import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
043import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * ProcedureScheduler for the Master Procedures. This ProcedureScheduler tries to provide to the
050 * ProcedureExecutor procedures that can be executed without having to wait on a lock. Most of the
051 * master operations can be executed concurrently, if they are operating on different tables (e.g.
052 * two create table procedures can be performed at the same time) or against two different servers;
053 * say two servers that crashed at about the same time.
054 * <p>
055 * Each procedure should implement an Interface providing information for this queue. For example
056 * table related procedures should implement TableProcedureInterface. Each procedure will be pushed
057 * in its own queue, and based on the operation type we may make smarter decisions: e.g. we can
058 * abort all the operations preceding a delete table, or similar.
059 * <h4>Concurrency control</h4> Concurrent access to member variables (tableRunQueue,
060 * serverRunQueue, locking, tableMap, serverBuckets) is controlled by schedLock(). This mainly
061 * includes:<br>
062 * <ul>
063 * <li>{@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue when:
064 * <ol>
065 * <li>Queue was empty before push (so must have been out of run-queue)</li>
066 * <li>Child procedure is added (which means parent procedure holds exclusive lock, and it must have
067 * moved Queue out of run-queue)</li>
068 * </ol>
069 * </li>
070 * <li>{@link #poll(long)}: A poll will remove a Queue from run-queue when:
071 * <ol>
072 * <li>Queue becomes empty after poll</li>
073 * <li>Exclusive lock is requested by polled procedure and lock is available (returns the
074 * procedure)</li>
075 * <li>Exclusive lock is requested but lock is not available (returns null)</li>
076 * <li>Polled procedure is child of parent holding exclusive lock and the next procedure is not a
077 * child</li>
078 * </ol>
079 * </li>
080 * <li>Namespace/table/region locks: Queue is added back to run-queue when lock being released is:
081 * <ol>
082 * <li>Exclusive lock</li>
083 * <li>Last shared lock (in case queue was removed because next procedure in queue required
084 * exclusive lock)</li>
085 * </ol>
086 * </li>
087 * </ul>
088 */
089@InterfaceAudience.Private
090public class MasterProcedureScheduler extends AbstractProcedureScheduler {
091  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class);
092
093  private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR =
094    (n, k) -> n.compareKey((ServerName) k);
095  private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR =
096    (n, k) -> n.compareKey((TableName) k);
097  private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR =
098    (n, k) -> n.compareKey((String) k);
099  private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
100    (n, k) -> n.compareKey((TableName) k);
101  private final static AvlKeyComparator<GlobalQueue> GLOBAL_QUEUE_KEY_COMPARATOR =
102    (n, k) -> n.compareKey((String) k);
103
104  private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
105  private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
106  private final FairQueue<String> peerRunQueue = new FairQueue<>();
107  private final FairQueue<TableName> metaRunQueue = new FairQueue<>();
108  private final FairQueue<String> globalRunQueue = new FairQueue<>();
109
110  private final ServerQueue[] serverBuckets = new ServerQueue[128];
111  private TableQueue tableMap = null;
112  private PeerQueue peerMap = null;
113  private MetaQueue metaMap = null;
114  private GlobalQueue globalMap = null;
115
116  private final SchemaLocking locking;
117
118  public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) {
119    locking = new SchemaLocking(procedureRetriever);
120  }
121
122  @Override
123  public void yield(final Procedure proc) {
124    push(proc, false, true);
125  }
126
127  @Override
128  protected void enqueue(final Procedure proc, final boolean addFront) {
129    if (isMetaProcedure(proc)) {
130      doAdd(metaRunQueue, getMetaQueue(), proc, addFront);
131    } else if (isTableProcedure(proc)) {
132      doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
133    } else if (isServerProcedure(proc)) {
134      ServerProcedureInterface spi = (ServerProcedureInterface) proc;
135      doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
136    } else if (isPeerProcedure(proc)) {
137      doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
138    } else if (isGlobalProcedure(proc)) {
139      doAdd(globalRunQueue, getGlobalQueue(getGlobalId(proc)), proc, addFront);
140    } else {
141      // TODO: at the moment we only have Table and Server procedures
142      // if you are implementing a non-table/non-server procedure, you have two options: create
143      // a group for all the non-table/non-server procedures or try to find a key for your
144      // non-table/non-server procedures and implement something similar to the TableRunQueue.
145      throw new UnsupportedOperationException(
146        "RQs for non-table/non-server procedures are not implemented yet: " + proc);
147    }
148  }
149
150  private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,
151    Procedure<?> proc, boolean addFront) {
152    queue.add(proc, addFront);
153    // For the following conditions, we will put the queue back into execution
154    // 1. The procedure has already held the lock, or the lock has been restored when restarting,
155    // which means it can be executed immediately.
156    // 2. The exclusive lock for this queue has not been held.
157    // 3. The given procedure has the exclusive lock permission for this queue.
158    Supplier<String> reason = null;
159    if (proc.hasLock()) {
160      reason = () -> proc + " has lock";
161    } else if (proc.isLockedWhenLoading()) {
162      reason = () -> proc + " restores lock when restarting";
163    } else if (!queue.getLockStatus().hasExclusiveLock()) {
164      reason = () -> "the exclusive lock is not held by anyone when adding " + proc;
165    } else if (queue.getLockStatus().hasLockAccess(proc)) {
166      reason = () -> proc + " has the excusive lock access";
167    }
168    if (reason != null) {
169      addToRunQueue(fairq, queue, reason);
170    }
171  }
172
173  @Override
174  protected boolean queueHasRunnables() {
175    return globalRunQueue.hasRunnables() || metaRunQueue.hasRunnables()
176      || tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables()
177      || peerRunQueue.hasRunnables();
178  }
179
180  @Override
181  protected Procedure dequeue() {
182    // pull global first
183    Procedure<?> pollResult = doPoll(globalRunQueue);
184    // then meta procedure
185    if (pollResult == null) {
186      pollResult = doPoll(metaRunQueue);
187    }
188    // For now, let server handling have precedence over table handling; presumption is that it
189    // is more important handling crashed servers than it is running the
190    // enabling/disabling tables, etc.
191    if (pollResult == null) {
192      pollResult = doPoll(serverRunQueue);
193    }
194    if (pollResult == null) {
195      pollResult = doPoll(peerRunQueue);
196    }
197    if (pollResult == null) {
198      pollResult = doPoll(tableRunQueue);
199    }
200    return pollResult;
201  }
202
203  private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) {
204    LockStatus s = rq.getLockStatus();
205    // if we have the lock access, we are ready
206    if (s.hasLockAccess(proc)) {
207      return true;
208    }
209    boolean xlockReq = rq.requireExclusiveLock(proc);
210    // if we need to hold the xlock, then we need to make sure that no one holds any lock, including
211    // the shared lock, otherwise, we just need to make sure that no one holds the xlock
212    return xlockReq ? !s.isLocked() : !s.hasExclusiveLock();
213  }
214
215  private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) {
216    Queue<T> rq = fairq.poll();
217    if (rq == null || !rq.isAvailable()) {
218      return null;
219    }
220    // loop until we find out a procedure which is ready to run, or if we have checked all the
221    // procedures, then we give up and remove the queue from run queue.
222    for (int i = 0, n = rq.size(); i < n; i++) {
223      Procedure<?> proc = rq.poll();
224      if (isLockReady(proc, rq)) {
225        // the queue is empty, remove from run queue
226        if (rq.isEmpty()) {
227          removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc);
228        }
229        return proc;
230      }
231      // we are not ready to run, add back and try the next procedure
232      rq.add(proc, false);
233    }
234    // no procedure is ready for execution, remove from run queue
235    removeFromRunQueue(fairq, rq, () -> "no procedure can be executed");
236    return null;
237  }
238
239  @Override
240  public List<LockedResource> getLocks() {
241    schedLock();
242    try {
243      return locking.getLocks();
244    } finally {
245      schedUnlock();
246    }
247  }
248
249  @Override
250  public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
251    schedLock();
252    try {
253      return locking.getLockResource(resourceType, resourceName);
254    } finally {
255      schedUnlock();
256    }
257  }
258
259  @Override
260  public void clear() {
261    schedLock();
262    try {
263      clearQueue();
264      locking.clear();
265    } finally {
266      schedUnlock();
267    }
268  }
269
270  private void clearQueue() {
271    // Remove Servers
272    for (int i = 0; i < serverBuckets.length; ++i) {
273      clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
274      serverBuckets[i] = null;
275    }
276
277    // Remove Tables
278    clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
279    tableMap = null;
280
281    // Remove Peers
282    clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR);
283    peerMap = null;
284
285    // Remove Meta
286    clear(metaMap, metaRunQueue, META_QUEUE_KEY_COMPARATOR);
287    metaMap = null;
288
289    // Remove Global
290    clear(globalMap, globalRunQueue, GLOBAL_QUEUE_KEY_COMPARATOR);
291    globalMap = null;
292
293    assert size() == 0 : "expected queue size to be 0, got " + size();
294  }
295
296  private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap,
297    FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) {
298    while (treeMap != null) {
299      Queue<T> node = AvlTree.getFirst(treeMap);
300      treeMap = AvlTree.remove(treeMap, node.getKey(), comparator);
301      if (fairq != null) {
302        removeFromRunQueue(fairq, node, () -> "clear all queues");
303      }
304    }
305  }
306
307  private int queueSize(Queue<?> head) {
308    int count = 0;
309    AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head);
310    while (iter.hasNext()) {
311      count += iter.next().size();
312    }
313    return count;
314  }
315
316  @Override
317  protected int queueSize() {
318    int count = 0;
319    for (ServerQueue serverMap : serverBuckets) {
320      count += queueSize(serverMap);
321    }
322    count += queueSize(tableMap);
323    count += queueSize(peerMap);
324    count += queueSize(metaMap);
325    count += queueSize(globalMap);
326    return count;
327  }
328
329  @Override
330  public void completionCleanup(final Procedure proc) {
331    if (proc instanceof TableProcedureInterface) {
332      TableProcedureInterface iProcTable = (TableProcedureInterface) proc;
333      boolean tableDeleted;
334      if (proc.hasException()) {
335        Exception procEx = proc.getException().unwrapRemoteException();
336        if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
337          // create failed because the table already exist
338          tableDeleted = !(procEx instanceof TableExistsException);
339        } else {
340          // the operation failed because the table does not exist
341          tableDeleted = (procEx instanceof TableNotFoundException);
342        }
343      } else {
344        // the table was deleted
345        tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
346      }
347      if (tableDeleted) {
348        markTableAsDeleted(iProcTable.getTableName(), proc);
349        return;
350      }
351    } else if (proc instanceof PeerProcedureInterface) {
352      tryCleanupPeerQueue(getPeerId(proc), proc);
353    } else if (proc instanceof ServerProcedureInterface) {
354      tryCleanupServerQueue(getServerName(proc), proc);
355    } else {
356      // No cleanup for other procedure types, yet.
357      return;
358    }
359  }
360
361  private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue,
362    Supplier<String> reason) {
363    if (LOG.isTraceEnabled()) {
364      LOG.trace("Add {} to run queue because: {}", queue, reason.get());
365    }
366    if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) {
367      fairq.add(queue);
368    }
369  }
370
371  private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq,
372    Queue<T> queue, Supplier<String> reason) {
373    if (LOG.isTraceEnabled()) {
374      LOG.trace("Remove {} from run queue because: {}", queue, reason.get());
375    }
376    if (AvlIterableList.isLinked(queue)) {
377      fairq.remove(queue);
378    }
379  }
380
381  // ============================================================================
382  // Table Queue Lookup Helpers
383  // ============================================================================
384  private TableQueue getTableQueue(TableName tableName) {
385    TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
386    if (node != null) return node;
387
388    node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName),
389      locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString()));
390    tableMap = AvlTree.insert(tableMap, node);
391    return node;
392  }
393
394  private void removeTableQueue(TableName tableName) {
395    tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
396    locking.removeTableLock(tableName);
397  }
398
399  private static boolean isTableProcedure(Procedure<?> proc) {
400    return proc instanceof TableProcedureInterface;
401  }
402
403  private static TableName getTableName(Procedure<?> proc) {
404    return ((TableProcedureInterface) proc).getTableName();
405  }
406
407  // ============================================================================
408  // Server Queue Lookup Helpers
409  // ============================================================================
410  private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) {
411    final int index = getBucketIndex(serverBuckets, serverName.hashCode());
412    ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
413    if (node != null) {
414      return node;
415    }
416    int priority;
417    if (proc != null) {
418      priority = MasterProcedureUtil.getServerPriority(proc);
419    } else {
420      priority = 1;
421    }
422    node = new ServerQueue(serverName, priority, locking.getServerLock(serverName));
423    serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
424    return node;
425  }
426
427  private void removeServerQueue(ServerName serverName) {
428    int index = getBucketIndex(serverBuckets, serverName.hashCode());
429    serverBuckets[index] =
430      AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
431    locking.removeServerLock(serverName);
432  }
433
434  private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
435    schedLock();
436    try {
437      int index = getBucketIndex(serverBuckets, serverName.hashCode());
438      ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
439      if (node == null) {
440        return;
441      }
442
443      LockAndQueue lock = locking.getServerLock(serverName);
444      if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
445        removeFromRunQueue(serverRunQueue, node,
446          () -> "clean up server queue after " + proc + " completed");
447        removeServerQueue(serverName);
448      }
449    } finally {
450      schedUnlock();
451    }
452  }
453
454  private static int getBucketIndex(Object[] buckets, int hashCode) {
455    return Math.abs(hashCode) % buckets.length;
456  }
457
458  private static boolean isServerProcedure(Procedure<?> proc) {
459    return proc instanceof ServerProcedureInterface;
460  }
461
462  private static ServerName getServerName(Procedure<?> proc) {
463    return ((ServerProcedureInterface) proc).getServerName();
464  }
465
466  // ============================================================================
467  // Peer Queue Lookup Helpers
468  // ============================================================================
469  private PeerQueue getPeerQueue(String peerId) {
470    PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
471    if (node != null) {
472      return node;
473    }
474    node = new PeerQueue(peerId, locking.getPeerLock(peerId));
475    peerMap = AvlTree.insert(peerMap, node);
476    return node;
477  }
478
479  private void removePeerQueue(String peerId) {
480    peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
481    locking.removePeerLock(peerId);
482  }
483
484  private void tryCleanupPeerQueue(String peerId, Procedure<?> procedure) {
485    schedLock();
486    try {
487      PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
488      if (queue == null) {
489        return;
490      }
491
492      final LockAndQueue lock = locking.getPeerLock(peerId);
493      if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
494        removeFromRunQueue(peerRunQueue, queue,
495          () -> "clean up peer queue after " + procedure + " completed");
496        removePeerQueue(peerId);
497      }
498    } finally {
499      schedUnlock();
500    }
501  }
502
503  private static boolean isPeerProcedure(Procedure<?> proc) {
504    return proc instanceof PeerProcedureInterface;
505  }
506
507  private static String getPeerId(Procedure<?> proc) {
508    return ((PeerProcedureInterface) proc).getPeerId();
509  }
510
511  // ============================================================================
512  // Meta Queue Lookup Helpers
513  // ============================================================================
514  private MetaQueue getMetaQueue() {
515    MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR);
516    if (node != null) {
517      return node;
518    }
519    node = new MetaQueue(locking.getMetaLock());
520    metaMap = AvlTree.insert(metaMap, node);
521    return node;
522  }
523
524  private static boolean isMetaProcedure(Procedure<?> proc) {
525    return proc instanceof MetaProcedureInterface;
526  }
527
528  // ============================================================================
529  // Global Queue Lookup Helpers
530  // ============================================================================
531  private GlobalQueue getGlobalQueue(String globalId) {
532    GlobalQueue node = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
533    if (node != null) {
534      return node;
535    }
536    node = new GlobalQueue(globalId, locking.getGlobalLock(globalId));
537    globalMap = AvlTree.insert(globalMap, node);
538    return node;
539  }
540
541  private void removeGlobalQueue(String globalId) {
542    globalMap = AvlTree.remove(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
543    locking.removeGlobalLock(globalId);
544  }
545
546  private void tryCleanupGlobalQueue(String globalId, Procedure<?> procedure) {
547    schedLock();
548    try {
549      GlobalQueue queue = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
550      if (queue == null) {
551        return;
552      }
553
554      final LockAndQueue lock = locking.getGlobalLock(globalId);
555      if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
556        removeFromRunQueue(globalRunQueue, queue,
557          () -> "clean up global queue after " + procedure + " completed");
558        removeGlobalQueue(globalId);
559      }
560    } finally {
561      schedUnlock();
562    }
563  }
564
565  private static boolean isGlobalProcedure(Procedure<?> proc) {
566    return proc instanceof GlobalProcedureInterface;
567  }
568
569  private static String getGlobalId(Procedure<?> proc) {
570    return ((GlobalProcedureInterface) proc).getGlobalId();
571  }
572
573  // ============================================================================
574  // Table Locking Helpers
575  // ============================================================================
576  /**
577   * Get lock info for a resource of specified type and name and log details
578   */
579  private void logLockedResource(LockedResourceType resourceType, String resourceName) {
580    if (!LOG.isDebugEnabled()) {
581      return;
582    }
583
584    LockedResource lockedResource = getLockResource(resourceType, resourceName);
585    if (lockedResource != null) {
586      String msg = resourceType.toString() + " '" + resourceName + "', shared lock count="
587        + lockedResource.getSharedLockCount();
588
589      Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure();
590      if (proc != null) {
591        msg += ", exclusively locked by procId=" + proc.getProcId();
592      }
593      LOG.debug(msg);
594    }
595  }
596
597  /**
598   * Suspend the procedure if the specified table is already locked. Other operations in the
599   * table-queue will be executed after the lock is released.
600   * @param procedure the procedure trying to acquire the lock
601   * @param table     Table to lock
602   * @return true if the procedure has to wait for the table to be available
603   */
604  public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
605    schedLock();
606    try {
607      final String namespace = table.getNamespaceAsString();
608      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
609      final LockAndQueue tableLock = locking.getTableLock(table);
610      if (!namespaceLock.trySharedLock(procedure)) {
611        waitProcedure(namespaceLock, procedure);
612        logLockedResource(LockedResourceType.NAMESPACE, namespace);
613        return true;
614      }
615      if (!tableLock.tryExclusiveLock(procedure)) {
616        namespaceLock.releaseSharedLock();
617        waitProcedure(tableLock, procedure);
618        logLockedResource(LockedResourceType.TABLE, table.getNameAsString());
619        return true;
620      }
621      removeFromRunQueue(tableRunQueue, getTableQueue(table),
622        () -> procedure + " held the exclusive lock");
623      return false;
624    } finally {
625      schedUnlock();
626    }
627  }
628
629  /**
630   * Wake the procedures waiting for the specified table
631   * @param procedure the procedure releasing the lock
632   * @param table     the name of the table that has the exclusive lock
633   */
634  public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
635    schedLock();
636    try {
637      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
638      final LockAndQueue tableLock = locking.getTableLock(table);
639      int waitingCount = 0;
640      if (tableLock.releaseExclusiveLock(procedure)) {
641        waitingCount += wakeWaitingProcedures(tableLock);
642      }
643      if (namespaceLock.releaseSharedLock()) {
644        waitingCount += wakeWaitingProcedures(namespaceLock);
645      }
646      addToRunQueue(tableRunQueue, getTableQueue(table),
647        () -> procedure + " released the exclusive lock");
648      wakePollIfNeeded(waitingCount);
649    } finally {
650      schedUnlock();
651    }
652  }
653
654  /**
655   * Suspend the procedure if the specified table is already locked. other "read" operations in the
656   * table-queue may be executed concurrently,
657   * @param procedure the procedure trying to acquire the lock
658   * @param table     Table to lock
659   * @return true if the procedure has to wait for the table to be available
660   */
661  public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) {
662    return waitTableQueueSharedLock(procedure, table) == null;
663  }
664
665  private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) {
666    schedLock();
667    try {
668      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
669      final LockAndQueue tableLock = locking.getTableLock(table);
670      if (!namespaceLock.trySharedLock(procedure)) {
671        waitProcedure(namespaceLock, procedure);
672        return null;
673      }
674
675      if (!tableLock.trySharedLock(procedure)) {
676        namespaceLock.releaseSharedLock();
677        waitProcedure(tableLock, procedure);
678        return null;
679      }
680
681      return getTableQueue(table);
682    } finally {
683      schedUnlock();
684    }
685  }
686
687  /**
688   * Wake the procedures waiting for the specified table
689   * @param procedure the procedure releasing the lock
690   * @param table     the name of the table that has the shared lock
691   */
692  public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) {
693    schedLock();
694    try {
695      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
696      final LockAndQueue tableLock = locking.getTableLock(table);
697      int waitingCount = 0;
698      if (tableLock.releaseSharedLock()) {
699        addToRunQueue(tableRunQueue, getTableQueue(table),
700          () -> procedure + " released the shared lock");
701        waitingCount += wakeWaitingProcedures(tableLock);
702      }
703      if (namespaceLock.releaseSharedLock()) {
704        waitingCount += wakeWaitingProcedures(namespaceLock);
705      }
706      wakePollIfNeeded(waitingCount);
707    } finally {
708      schedUnlock();
709    }
710  }
711
712  /**
713   * Tries to remove the queue and the table-lock of the specified table. If there are new
714   * operations pending (e.g. a new create), the remove will not be performed.
715   * @param table     the name of the table that should be marked as deleted
716   * @param procedure the procedure that is removing the table
717   * @return true if deletion succeeded, false otherwise meaning that there are other new operations
718   *         pending for that table (e.g. a new create).
719   */
720  boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) {
721    schedLock();
722    try {
723      final TableQueue queue = getTableQueue(table);
724      final LockAndQueue tableLock = locking.getTableLock(table);
725      if (queue == null) return true;
726
727      if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) {
728        // remove the table from the run-queue and the map
729        if (AvlIterableList.isLinked(queue)) {
730          tableRunQueue.remove(queue);
731        }
732        removeTableQueue(table);
733      } else {
734        // TODO: If there are no create, we can drop all the other ops
735        return false;
736      }
737    } finally {
738      schedUnlock();
739    }
740    return true;
741  }
742
743  // ============================================================================
744  // Region Locking Helpers
745  // ============================================================================
746  /**
747   * Suspend the procedure if the specified region is already locked.
748   * @param procedure  the procedure trying to acquire the lock on the region
749   * @param regionInfo the region we are trying to lock
750   * @return true if the procedure has to wait for the regions to be available
751   */
752  public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
753    return waitRegions(procedure, regionInfo.getTable(), regionInfo);
754  }
755
756  /**
757   * Suspend the procedure if the specified set of regions are already locked.
758   * @param procedure   the procedure trying to acquire the lock on the regions
759   * @param table       the table name of the regions we are trying to lock
760   * @param regionInfos the list of regions we are trying to lock
761   * @return true if the procedure has to wait for the regions to be available
762   */
763  public boolean waitRegions(final Procedure<?> procedure, final TableName table,
764    final RegionInfo... regionInfos) {
765    Arrays.sort(regionInfos, RegionInfo.COMPARATOR);
766    schedLock();
767    try {
768      assert table != null;
769      if (waitTableSharedLock(procedure, table)) {
770        return true;
771      }
772
773      // acquire region xlocks or wait
774      boolean hasLock = true;
775      final LockAndQueue[] regionLocks = new LockAndQueue[regionInfos.length];
776      for (int i = 0; i < regionInfos.length; ++i) {
777        assert regionInfos[i] != null;
778        assert regionInfos[i].getTable() != null;
779        assert regionInfos[i].getTable().equals(table) : regionInfos[i] + " " + procedure;
780        assert i == 0 || regionInfos[i] != regionInfos[i - 1]
781          : "duplicate region: " + regionInfos[i];
782
783        regionLocks[i] = locking.getRegionLock(regionInfos[i].getEncodedName());
784        if (!regionLocks[i].tryExclusiveLock(procedure)) {
785          LOG.info("Waiting on xlock for {} held by pid={}", procedure,
786            regionLocks[i].getExclusiveLockProcIdOwner());
787          waitProcedure(regionLocks[i], procedure);
788          hasLock = false;
789          while (i-- > 0) {
790            regionLocks[i].releaseExclusiveLock(procedure);
791          }
792          break;
793        } else {
794          LOG.info("Took xlock for {}", procedure);
795        }
796      }
797
798      if (!hasLock) {
799        wakeTableSharedLock(procedure, table);
800      }
801      return !hasLock;
802    } finally {
803      schedUnlock();
804    }
805  }
806
807  /**
808   * Wake the procedures waiting for the specified region
809   * @param procedure  the procedure that was holding the region
810   * @param regionInfo the region the procedure was holding
811   */
812  public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
813    wakeRegions(procedure, regionInfo.getTable(), regionInfo);
814  }
815
816  /**
817   * Wake the procedures waiting for the specified regions
818   * @param procedure   the procedure that was holding the regions
819   * @param regionInfos the list of regions the procedure was holding
820   */
821  public void wakeRegions(final Procedure<?> procedure, final TableName table,
822    final RegionInfo... regionInfos) {
823    Arrays.sort(regionInfos, RegionInfo.COMPARATOR);
824    schedLock();
825    try {
826      int numProcs = 0;
827      final Procedure<?>[] nextProcs = new Procedure[regionInfos.length];
828      for (int i = 0; i < regionInfos.length; ++i) {
829        assert regionInfos[i].getTable().equals(table);
830        assert i == 0 || regionInfos[i] != regionInfos[i - 1]
831          : "duplicate region: " + regionInfos[i];
832
833        LockAndQueue regionLock = locking.getRegionLock(regionInfos[i].getEncodedName());
834        if (regionLock.releaseExclusiveLock(procedure)) {
835          if (!regionLock.isWaitingQueueEmpty()) {
836            // release one procedure at the time since regions has an xlock
837            nextProcs[numProcs++] = regionLock.removeFirst();
838          } else {
839            locking.removeRegionLock(regionInfos[i].getEncodedName());
840          }
841        }
842      }
843
844      // awake procedures if any
845      for (int i = numProcs - 1; i >= 0; --i) {
846        wakeProcedure(nextProcs[i]);
847      }
848      wakePollIfNeeded(numProcs);
849      // release the table shared-lock.
850      wakeTableSharedLock(procedure, table);
851    } finally {
852      schedUnlock();
853    }
854  }
855
856  // ============================================================================
857  // Namespace Locking Helpers
858  // ============================================================================
859  /**
860   * Suspend the procedure if the specified namespace is already locked.
861   * @see #wakeNamespaceExclusiveLock(Procedure,String)
862   * @param procedure the procedure trying to acquire the lock
863   * @param namespace Namespace to lock
864   * @return true if the procedure has to wait for the namespace to be available
865   */
866  public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) {
867    schedLock();
868    try {
869      final LockAndQueue systemNamespaceTableLock =
870        locking.getTableLock(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME);
871      if (!systemNamespaceTableLock.trySharedLock(procedure)) {
872        waitProcedure(systemNamespaceTableLock, procedure);
873        logLockedResource(LockedResourceType.TABLE,
874          TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME.getNameAsString());
875        return true;
876      }
877
878      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
879      if (!namespaceLock.tryExclusiveLock(procedure)) {
880        systemNamespaceTableLock.releaseSharedLock();
881        waitProcedure(namespaceLock, procedure);
882        logLockedResource(LockedResourceType.NAMESPACE, namespace);
883        return true;
884      }
885      return false;
886    } finally {
887      schedUnlock();
888    }
889  }
890
891  /**
892   * Wake the procedures waiting for the specified namespace
893   * @see #waitNamespaceExclusiveLock(Procedure,String)
894   * @param procedure the procedure releasing the lock
895   * @param namespace the namespace that has the exclusive lock
896   */
897  public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) {
898    schedLock();
899    try {
900      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
901      final LockAndQueue systemNamespaceTableLock =
902        locking.getTableLock(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME);
903      int waitingCount = 0;
904      if (namespaceLock.releaseExclusiveLock(procedure)) {
905        waitingCount += wakeWaitingProcedures(namespaceLock);
906      }
907      if (systemNamespaceTableLock.releaseSharedLock()) {
908        addToRunQueue(tableRunQueue,
909          getTableQueue(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME),
910          () -> procedure + " released namespace exclusive lock");
911        waitingCount += wakeWaitingProcedures(systemNamespaceTableLock);
912      }
913      wakePollIfNeeded(waitingCount);
914    } finally {
915      schedUnlock();
916    }
917  }
918
919  // ============================================================================
920  // Server Locking Helpers
921  // ============================================================================
922  /**
923   * Try to acquire the exclusive lock on the specified server.
924   * @see #wakeServerExclusiveLock(Procedure,ServerName)
925   * @param procedure  the procedure trying to acquire the lock
926   * @param serverName Server to lock
927   * @return true if the procedure has to wait for the server to be available
928   */
929  public boolean waitServerExclusiveLock(final Procedure<?> procedure,
930    final ServerName serverName) {
931    schedLock();
932    try {
933      final LockAndQueue lock = locking.getServerLock(serverName);
934      if (lock.tryExclusiveLock(procedure)) {
935        // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
936        // so.
937        removeFromRunQueue(serverRunQueue,
938          getServerQueue(serverName,
939            procedure instanceof ServerProcedureInterface
940              ? (ServerProcedureInterface) procedure
941              : null),
942          () -> procedure + " held exclusive lock");
943        return false;
944      }
945      waitProcedure(lock, procedure);
946      logLockedResource(LockedResourceType.SERVER, serverName.getServerName());
947      return true;
948    } finally {
949      schedUnlock();
950    }
951  }
952
953  /**
954   * Wake the procedures waiting for the specified server
955   * @see #waitServerExclusiveLock(Procedure,ServerName)
956   * @param procedure  the procedure releasing the lock
957   * @param serverName the server that has the exclusive lock
958   */
959  public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) {
960    schedLock();
961    try {
962      final LockAndQueue lock = locking.getServerLock(serverName);
963      // Only SCP will acquire/release server lock so do not need to check the return value here.
964      lock.releaseExclusiveLock(procedure);
965      // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
966      // so.
967      addToRunQueue(serverRunQueue,
968        getServerQueue(serverName,
969          procedure instanceof ServerProcedureInterface
970            ? (ServerProcedureInterface) procedure
971            : null),
972        () -> procedure + " released exclusive lock");
973      int waitingCount = wakeWaitingProcedures(lock);
974      wakePollIfNeeded(waitingCount);
975    } finally {
976      schedUnlock();
977    }
978  }
979
980  // ============================================================================
981  // Peer Locking Helpers
982  // ============================================================================
983  /**
984   * Try to acquire the exclusive lock on the specified peer.
985   * @see #wakePeerExclusiveLock(Procedure, String)
986   * @param procedure the procedure trying to acquire the lock
987   * @param peerId    peer to lock
988   * @return true if the procedure has to wait for the peer to be available
989   */
990  public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) {
991    schedLock();
992    try {
993      final LockAndQueue lock = locking.getPeerLock(peerId);
994      if (lock.tryExclusiveLock(procedure)) {
995        removeFromRunQueue(peerRunQueue, getPeerQueue(peerId),
996          () -> procedure + " held exclusive lock");
997        return false;
998      }
999      waitProcedure(lock, procedure);
1000      logLockedResource(LockedResourceType.PEER, peerId);
1001      return true;
1002    } finally {
1003      schedUnlock();
1004    }
1005  }
1006
1007  /**
1008   * Wake the procedures waiting for the specified peer
1009   * @see #waitPeerExclusiveLock(Procedure, String)
1010   * @param procedure the procedure releasing the lock
1011   * @param peerId    the peer that has the exclusive lock
1012   */
1013  public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) {
1014    schedLock();
1015    try {
1016      final LockAndQueue lock = locking.getPeerLock(peerId);
1017      if (lock.releaseExclusiveLock(procedure)) {
1018        addToRunQueue(peerRunQueue, getPeerQueue(peerId),
1019          () -> procedure + " released exclusive lock");
1020        int waitingCount = wakeWaitingProcedures(lock);
1021        wakePollIfNeeded(waitingCount);
1022      }
1023    } finally {
1024      schedUnlock();
1025    }
1026  }
1027
1028  // ============================================================================
1029  // Meta Locking Helpers
1030  // ============================================================================
1031  /**
1032   * Try to acquire the exclusive lock on meta.
1033   * @see #wakeMetaExclusiveLock(Procedure)
1034   * @param procedure the procedure trying to acquire the lock
1035   * @return true if the procedure has to wait for meta to be available
1036   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
1037   *             {@link RecoverMetaProcedure}.
1038   */
1039  @Deprecated
1040  public boolean waitMetaExclusiveLock(Procedure<?> procedure) {
1041    schedLock();
1042    try {
1043      final LockAndQueue lock = locking.getMetaLock();
1044      if (lock.tryExclusiveLock(procedure)) {
1045        removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock");
1046        return false;
1047      }
1048      waitProcedure(lock, procedure);
1049      logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString());
1050      return true;
1051    } finally {
1052      schedUnlock();
1053    }
1054  }
1055
1056  /**
1057   * Wake the procedures waiting for meta.
1058   * @see #waitMetaExclusiveLock(Procedure)
1059   * @param procedure the procedure releasing the lock
1060   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
1061   *             {@link RecoverMetaProcedure}.
1062   */
1063  @Deprecated
1064  public void wakeMetaExclusiveLock(Procedure<?> procedure) {
1065    schedLock();
1066    try {
1067      final LockAndQueue lock = locking.getMetaLock();
1068      lock.releaseExclusiveLock(procedure);
1069      addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock");
1070      int waitingCount = wakeWaitingProcedures(lock);
1071      wakePollIfNeeded(waitingCount);
1072    } finally {
1073      schedUnlock();
1074    }
1075  }
1076
1077  // ============================================================================
1078  // Global Locking Helpers
1079  // ============================================================================
1080  /**
1081   * Try to acquire the share lock on global.
1082   * @see #wakeGlobalExclusiveLock(Procedure, String)
1083   * @param procedure the procedure trying to acquire the lock
1084   * @return true if the procedure has to wait for global to be available
1085   */
1086  public boolean waitGlobalExclusiveLock(Procedure<?> procedure, String globalId) {
1087    schedLock();
1088    try {
1089      final LockAndQueue lock = locking.getGlobalLock(globalId);
1090      if (lock.tryExclusiveLock(procedure)) {
1091        removeFromRunQueue(globalRunQueue, getGlobalQueue(globalId),
1092          () -> procedure + " held shared lock");
1093        return false;
1094      }
1095      waitProcedure(lock, procedure);
1096      logLockedResource(LockedResourceType.GLOBAL, HConstants.EMPTY_STRING);
1097      return true;
1098    } finally {
1099      schedUnlock();
1100    }
1101  }
1102
1103  /**
1104   * Wake the procedures waiting for global.
1105   * @see #waitGlobalExclusiveLock(Procedure, String)
1106   * @param procedure the procedure releasing the lock
1107   */
1108  public void wakeGlobalExclusiveLock(Procedure<?> procedure, String globalId) {
1109    schedLock();
1110    try {
1111      final LockAndQueue lock = locking.getGlobalLock(globalId);
1112      lock.releaseExclusiveLock(procedure);
1113      addToRunQueue(globalRunQueue, getGlobalQueue(globalId),
1114        () -> procedure + " released shared lock");
1115      int waitingCount = wakeWaitingProcedures(lock);
1116      wakePollIfNeeded(waitingCount);
1117    } finally {
1118      schedUnlock();
1119    }
1120  }
1121
1122  /**
1123   * For debugging. Expensive.
1124   */
1125  public String dumpLocks() throws IOException {
1126    schedLock();
1127    try {
1128      // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter
1129      return this.locking.toString();
1130    } finally {
1131      schedUnlock();
1132    }
1133  }
1134
1135  private void serverBucketToString(ToStringBuilder builder, String queueName, Queue<?> queue) {
1136    int size = queueSize(queue);
1137    if (size != 0) {
1138      builder.append(queueName, queue);
1139    }
1140  }
1141
1142  @Override
1143  public String toString() {
1144    ToStringBuilder builder =
1145      new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).appendSuper(super.toString());
1146    schedLock();
1147    try {
1148      for (int i = 0; i < serverBuckets.length; i++) {
1149        serverBucketToString(builder, "serverBuckets[" + i + "]", serverBuckets[i]);
1150      }
1151      builder.append("tableMap", tableMap);
1152      builder.append("peerMap", peerMap);
1153      builder.append("metaMap", metaMap);
1154      builder.append("globalMap", globalMap);
1155    } finally {
1156      schedUnlock();
1157    }
1158    return builder.build();
1159  }
1160}