001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.function.Function;
024import java.util.function.Supplier;
025import org.apache.hadoop.hbase.ServerName;
026import org.apache.hadoop.hbase.TableExistsException;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.TableNotFoundException;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
031import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
032import org.apache.hadoop.hbase.procedure2.LockAndQueue;
033import org.apache.hadoop.hbase.procedure2.LockStatus;
034import org.apache.hadoop.hbase.procedure2.LockedResource;
035import org.apache.hadoop.hbase.procedure2.LockedResourceType;
036import org.apache.hadoop.hbase.procedure2.Procedure;
037import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
038import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
039import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
040import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * ProcedureScheduler for the Master Procedures. This ProcedureScheduler tries to provide to the
047 * ProcedureExecutor procedures that can be executed without having to wait on a lock. Most of the
048 * master operations can be executed concurrently, if they are operating on different tables (e.g.
049 * two create table procedures can be performed at the same time) or against two different servers;
050 * say two servers that crashed at about the same time.
051 * <p>
052 * Each procedure should implement an Interface providing information for this queue. For example
053 * table related procedures should implement TableProcedureInterface. Each procedure will be pushed
054 * in its own queue, and based on the operation type we may make smarter decisions: e.g. we can
055 * abort all the operations preceding a delete table, or similar.
056 * <h4>Concurrency control</h4> Concurrent access to member variables (tableRunQueue,
057 * serverRunQueue, locking, tableMap, serverBuckets) is controlled by schedLock(). This mainly
058 * includes:<br>
059 * <ul>
060 * <li>{@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue when:
061 * <ol>
062 * <li>Queue was empty before push (so must have been out of run-queue)</li>
063 * <li>Child procedure is added (which means parent procedure holds exclusive lock, and it must have
064 * moved Queue out of run-queue)</li>
065 * </ol>
066 * </li>
067 * <li>{@link #poll(long)}: A poll will remove a Queue from run-queue when:
068 * <ol>
069 * <li>Queue becomes empty after poll</li>
070 * <li>Exclusive lock is requested by polled procedure and lock is available (returns the
071 * procedure)</li>
072 * <li>Exclusive lock is requested but lock is not available (returns null)</li>
073 * <li>Polled procedure is child of parent holding exclusive lock and the next procedure is not a
074 * child</li>
075 * </ol>
076 * </li>
077 * <li>Namespace/table/region locks: Queue is added back to run-queue when lock being released is:
078 * <ol>
079 * <li>Exclusive lock</li>
080 * <li>Last shared lock (in case queue was removed because next procedure in queue required
081 * exclusive lock)</li>
082 * </ol>
083 * </li>
084 * </ul>
085 */
086@InterfaceAudience.Private
087public class MasterProcedureScheduler extends AbstractProcedureScheduler {
088  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class);
089
090  private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR =
091    (n, k) -> n.compareKey((ServerName) k);
092  private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR =
093    (n, k) -> n.compareKey((TableName) k);
094  private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR =
095    (n, k) -> n.compareKey((String) k);
096  private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
097    (n, k) -> n.compareKey((TableName) k);
098
099  private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
100  private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
101  private final FairQueue<String> peerRunQueue = new FairQueue<>();
102  private final FairQueue<TableName> metaRunQueue = new FairQueue<>();
103
104  private final ServerQueue[] serverBuckets = new ServerQueue[128];
105  private TableQueue tableMap = null;
106  private PeerQueue peerMap = null;
107  private MetaQueue metaMap = null;
108
109  private final SchemaLocking locking;
110
111  public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) {
112    locking = new SchemaLocking(procedureRetriever);
113  }
114
115  @Override
116  public void yield(final Procedure proc) {
117    push(proc, false, true);
118  }
119
120  @Override
121  protected void enqueue(final Procedure proc, final boolean addFront) {
122    if (isMetaProcedure(proc)) {
123      doAdd(metaRunQueue, getMetaQueue(), proc, addFront);
124    } else if (isTableProcedure(proc)) {
125      doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
126    } else if (isServerProcedure(proc)) {
127      ServerProcedureInterface spi = (ServerProcedureInterface) proc;
128      doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
129    } else if (isPeerProcedure(proc)) {
130      doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
131    } else {
132      // TODO: at the moment we only have Table and Server procedures
133      // if you are implementing a non-table/non-server procedure, you have two options: create
134      // a group for all the non-table/non-server procedures or try to find a key for your
135      // non-table/non-server procedures and implement something similar to the TableRunQueue.
136      throw new UnsupportedOperationException(
137        "RQs for non-table/non-server procedures are not implemented yet: " + proc);
138    }
139  }
140
141  private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,
142    Procedure<?> proc, boolean addFront) {
143    queue.add(proc, addFront);
144    // For the following conditions, we will put the queue back into execution
145    // 1. The procedure has already held the lock, or the lock has been restored when restarting,
146    // which means it can be executed immediately.
147    // 2. The exclusive lock for this queue has not been held.
148    // 3. The given procedure has the exclusive lock permission for this queue.
149    Supplier<String> reason = null;
150    if (proc.hasLock()) {
151      reason = () -> proc + " has lock";
152    } else if (proc.isLockedWhenLoading()) {
153      reason = () -> proc + " restores lock when restarting";
154    } else if (!queue.getLockStatus().hasExclusiveLock()) {
155      reason = () -> "the exclusive lock is not held by anyone when adding " + proc;
156    } else if (queue.getLockStatus().hasLockAccess(proc)) {
157      reason = () -> proc + " has the excusive lock access";
158    }
159    if (reason != null) {
160      addToRunQueue(fairq, queue, reason);
161    }
162  }
163
164  @Override
165  protected boolean queueHasRunnables() {
166    return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables()
167      || serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables();
168  }
169
170  @Override
171  protected Procedure dequeue() {
172    // meta procedure is always the first priority
173    Procedure<?> pollResult = doPoll(metaRunQueue);
174    // For now, let server handling have precedence over table handling; presumption is that it
175    // is more important handling crashed servers than it is running the
176    // enabling/disabling tables, etc.
177    if (pollResult == null) {
178      pollResult = doPoll(serverRunQueue);
179    }
180    if (pollResult == null) {
181      pollResult = doPoll(peerRunQueue);
182    }
183    if (pollResult == null) {
184      pollResult = doPoll(tableRunQueue);
185    }
186    return pollResult;
187  }
188
189  private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) {
190    LockStatus s = rq.getLockStatus();
191    // if we have the lock access, we are ready
192    if (s.hasLockAccess(proc)) {
193      return true;
194    }
195    boolean xlockReq = rq.requireExclusiveLock(proc);
196    // if we need to hold the xlock, then we need to make sure that no one holds any lock, including
197    // the shared lock, otherwise, we just need to make sure that no one holds the xlock
198    return xlockReq ? !s.isLocked() : !s.hasExclusiveLock();
199  }
200
201  private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) {
202    Queue<T> rq = fairq.poll();
203    if (rq == null || !rq.isAvailable()) {
204      return null;
205    }
206    // loop until we find out a procedure which is ready to run, or if we have checked all the
207    // procedures, then we give up and remove the queue from run queue.
208    for (int i = 0, n = rq.size(); i < n; i++) {
209      Procedure<?> proc = rq.poll();
210      if (isLockReady(proc, rq)) {
211        // the queue is empty, remove from run queue
212        if (rq.isEmpty()) {
213          removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc);
214        }
215        return proc;
216      }
217      // we are not ready to run, add back and try the next procedure
218      rq.add(proc, false);
219    }
220    // no procedure is ready for execution, remove from run queue
221    removeFromRunQueue(fairq, rq, () -> "no procedure can be executed");
222    return null;
223  }
224
225  @Override
226  public List<LockedResource> getLocks() {
227    schedLock();
228    try {
229      return locking.getLocks();
230    } finally {
231      schedUnlock();
232    }
233  }
234
235  @Override
236  public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
237    schedLock();
238    try {
239      return locking.getLockResource(resourceType, resourceName);
240    } finally {
241      schedUnlock();
242    }
243  }
244
245  @Override
246  public void clear() {
247    schedLock();
248    try {
249      clearQueue();
250      locking.clear();
251    } finally {
252      schedUnlock();
253    }
254  }
255
256  private void clearQueue() {
257    // Remove Servers
258    for (int i = 0; i < serverBuckets.length; ++i) {
259      clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
260      serverBuckets[i] = null;
261    }
262
263    // Remove Tables
264    clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
265    tableMap = null;
266
267    // Remove Peers
268    clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR);
269    peerMap = null;
270
271    assert size() == 0 : "expected queue size to be 0, got " + size();
272  }
273
274  private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap,
275    FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) {
276    while (treeMap != null) {
277      Queue<T> node = AvlTree.getFirst(treeMap);
278      treeMap = AvlTree.remove(treeMap, node.getKey(), comparator);
279      if (fairq != null) {
280        removeFromRunQueue(fairq, node, () -> "clear all queues");
281      }
282    }
283  }
284
285  private int queueSize(Queue<?> head) {
286    int count = 0;
287    AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head);
288    while (iter.hasNext()) {
289      count += iter.next().size();
290    }
291    return count;
292  }
293
294  @Override
295  protected int queueSize() {
296    int count = 0;
297    for (ServerQueue serverMap : serverBuckets) {
298      count += queueSize(serverMap);
299    }
300    count += queueSize(tableMap);
301    count += queueSize(peerMap);
302    count += queueSize(metaMap);
303    return count;
304  }
305
306  @Override
307  public void completionCleanup(final Procedure proc) {
308    if (proc instanceof TableProcedureInterface) {
309      TableProcedureInterface iProcTable = (TableProcedureInterface) proc;
310      boolean tableDeleted;
311      if (proc.hasException()) {
312        Exception procEx = proc.getException().unwrapRemoteException();
313        if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
314          // create failed because the table already exist
315          tableDeleted = !(procEx instanceof TableExistsException);
316        } else {
317          // the operation failed because the table does not exist
318          tableDeleted = (procEx instanceof TableNotFoundException);
319        }
320      } else {
321        // the table was deleted
322        tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
323      }
324      if (tableDeleted) {
325        markTableAsDeleted(iProcTable.getTableName(), proc);
326        return;
327      }
328    } else if (proc instanceof PeerProcedureInterface) {
329      tryCleanupPeerQueue(getPeerId(proc), proc);
330    } else if (proc instanceof ServerProcedureInterface) {
331      tryCleanupServerQueue(getServerName(proc), proc);
332    } else {
333      // No cleanup for other procedure types, yet.
334      return;
335    }
336  }
337
338  private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue,
339    Supplier<String> reason) {
340    if (LOG.isTraceEnabled()) {
341      LOG.trace("Add {} to run queue because: {}", queue, reason.get());
342    }
343    if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) {
344      fairq.add(queue);
345    }
346  }
347
348  private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq,
349    Queue<T> queue, Supplier<String> reason) {
350    if (LOG.isTraceEnabled()) {
351      LOG.trace("Remove {} from run queue because: {}", queue, reason.get());
352    }
353    if (AvlIterableList.isLinked(queue)) {
354      fairq.remove(queue);
355    }
356  }
357
358  // ============================================================================
359  // Table Queue Lookup Helpers
360  // ============================================================================
361  private TableQueue getTableQueue(TableName tableName) {
362    TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
363    if (node != null) return node;
364
365    node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName),
366      locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString()));
367    tableMap = AvlTree.insert(tableMap, node);
368    return node;
369  }
370
371  private void removeTableQueue(TableName tableName) {
372    tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
373    locking.removeTableLock(tableName);
374  }
375
376  private static boolean isTableProcedure(Procedure<?> proc) {
377    return proc instanceof TableProcedureInterface;
378  }
379
380  private static TableName getTableName(Procedure<?> proc) {
381    return ((TableProcedureInterface) proc).getTableName();
382  }
383
384  // ============================================================================
385  // Server Queue Lookup Helpers
386  // ============================================================================
387  private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) {
388    final int index = getBucketIndex(serverBuckets, serverName.hashCode());
389    ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
390    if (node != null) {
391      return node;
392    }
393    int priority;
394    if (proc != null) {
395      priority = MasterProcedureUtil.getServerPriority(proc);
396    } else {
397      priority = 1;
398    }
399    node = new ServerQueue(serverName, priority, locking.getServerLock(serverName));
400    serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
401    return node;
402  }
403
404  private void removeServerQueue(ServerName serverName) {
405    int index = getBucketIndex(serverBuckets, serverName.hashCode());
406    serverBuckets[index] =
407      AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
408    locking.removeServerLock(serverName);
409  }
410
411  private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
412    schedLock();
413    try {
414      int index = getBucketIndex(serverBuckets, serverName.hashCode());
415      ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
416      if (node == null) {
417        return;
418      }
419
420      LockAndQueue lock = locking.getServerLock(serverName);
421      if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
422        removeFromRunQueue(serverRunQueue, node,
423          () -> "clean up server queue after " + proc + " completed");
424        removeServerQueue(serverName);
425      }
426    } finally {
427      schedUnlock();
428    }
429  }
430
431  private static int getBucketIndex(Object[] buckets, int hashCode) {
432    return Math.abs(hashCode) % buckets.length;
433  }
434
435  private static boolean isServerProcedure(Procedure<?> proc) {
436    return proc instanceof ServerProcedureInterface;
437  }
438
439  private static ServerName getServerName(Procedure<?> proc) {
440    return ((ServerProcedureInterface) proc).getServerName();
441  }
442
443  // ============================================================================
444  // Peer Queue Lookup Helpers
445  // ============================================================================
446  private PeerQueue getPeerQueue(String peerId) {
447    PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
448    if (node != null) {
449      return node;
450    }
451    node = new PeerQueue(peerId, locking.getPeerLock(peerId));
452    peerMap = AvlTree.insert(peerMap, node);
453    return node;
454  }
455
456  private void removePeerQueue(String peerId) {
457    peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
458    locking.removePeerLock(peerId);
459  }
460
461  private void tryCleanupPeerQueue(String peerId, Procedure<?> procedure) {
462    schedLock();
463    try {
464      PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
465      if (queue == null) {
466        return;
467      }
468
469      final LockAndQueue lock = locking.getPeerLock(peerId);
470      if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
471        removeFromRunQueue(peerRunQueue, queue,
472          () -> "clean up peer queue after " + procedure + " completed");
473        removePeerQueue(peerId);
474      }
475    } finally {
476      schedUnlock();
477    }
478  }
479
480  private static boolean isPeerProcedure(Procedure<?> proc) {
481    return proc instanceof PeerProcedureInterface;
482  }
483
484  private static String getPeerId(Procedure<?> proc) {
485    return ((PeerProcedureInterface) proc).getPeerId();
486  }
487
488  // ============================================================================
489  // Meta Queue Lookup Helpers
490  // ============================================================================
491  private MetaQueue getMetaQueue() {
492    MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR);
493    if (node != null) {
494      return node;
495    }
496    node = new MetaQueue(locking.getMetaLock());
497    metaMap = AvlTree.insert(metaMap, node);
498    return node;
499  }
500
501  private static boolean isMetaProcedure(Procedure<?> proc) {
502    return proc instanceof MetaProcedureInterface;
503  }
504
505  // ============================================================================
506  // Table Locking Helpers
507  // ============================================================================
508  /**
509   * Get lock info for a resource of specified type and name and log details
510   */
511  private void logLockedResource(LockedResourceType resourceType, String resourceName) {
512    if (!LOG.isDebugEnabled()) {
513      return;
514    }
515
516    LockedResource lockedResource = getLockResource(resourceType, resourceName);
517    if (lockedResource != null) {
518      String msg = resourceType.toString() + " '" + resourceName + "', shared lock count="
519        + lockedResource.getSharedLockCount();
520
521      Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure();
522      if (proc != null) {
523        msg += ", exclusively locked by procId=" + proc.getProcId();
524      }
525      LOG.debug(msg);
526    }
527  }
528
529  /**
530   * Suspend the procedure if the specified table is already locked. Other operations in the
531   * table-queue will be executed after the lock is released.
532   * @param procedure the procedure trying to acquire the lock
533   * @param table     Table to lock
534   * @return true if the procedure has to wait for the table to be available
535   */
536  public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
537    schedLock();
538    try {
539      final String namespace = table.getNamespaceAsString();
540      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
541      final LockAndQueue tableLock = locking.getTableLock(table);
542      if (!namespaceLock.trySharedLock(procedure)) {
543        waitProcedure(namespaceLock, procedure);
544        logLockedResource(LockedResourceType.NAMESPACE, namespace);
545        return true;
546      }
547      if (!tableLock.tryExclusiveLock(procedure)) {
548        namespaceLock.releaseSharedLock();
549        waitProcedure(tableLock, procedure);
550        logLockedResource(LockedResourceType.TABLE, table.getNameAsString());
551        return true;
552      }
553      removeFromRunQueue(tableRunQueue, getTableQueue(table),
554        () -> procedure + " held the exclusive lock");
555      return false;
556    } finally {
557      schedUnlock();
558    }
559  }
560
561  /**
562   * Wake the procedures waiting for the specified table
563   * @param procedure the procedure releasing the lock
564   * @param table     the name of the table that has the exclusive lock
565   */
566  public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
567    schedLock();
568    try {
569      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
570      final LockAndQueue tableLock = locking.getTableLock(table);
571      int waitingCount = 0;
572      if (tableLock.releaseExclusiveLock(procedure)) {
573        waitingCount += wakeWaitingProcedures(tableLock);
574      }
575      if (namespaceLock.releaseSharedLock()) {
576        waitingCount += wakeWaitingProcedures(namespaceLock);
577      }
578      addToRunQueue(tableRunQueue, getTableQueue(table),
579        () -> procedure + " released the exclusive lock");
580      wakePollIfNeeded(waitingCount);
581    } finally {
582      schedUnlock();
583    }
584  }
585
586  /**
587   * Suspend the procedure if the specified table is already locked. other "read" operations in the
588   * table-queue may be executed concurrently,
589   * @param procedure the procedure trying to acquire the lock
590   * @param table     Table to lock
591   * @return true if the procedure has to wait for the table to be available
592   */
593  public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) {
594    return waitTableQueueSharedLock(procedure, table) == null;
595  }
596
597  private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) {
598    schedLock();
599    try {
600      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
601      final LockAndQueue tableLock = locking.getTableLock(table);
602      if (!namespaceLock.trySharedLock(procedure)) {
603        waitProcedure(namespaceLock, procedure);
604        return null;
605      }
606
607      if (!tableLock.trySharedLock(procedure)) {
608        namespaceLock.releaseSharedLock();
609        waitProcedure(tableLock, procedure);
610        return null;
611      }
612
613      return getTableQueue(table);
614    } finally {
615      schedUnlock();
616    }
617  }
618
619  /**
620   * Wake the procedures waiting for the specified table
621   * @param procedure the procedure releasing the lock
622   * @param table     the name of the table that has the shared lock
623   */
624  public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) {
625    schedLock();
626    try {
627      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
628      final LockAndQueue tableLock = locking.getTableLock(table);
629      int waitingCount = 0;
630      if (tableLock.releaseSharedLock()) {
631        addToRunQueue(tableRunQueue, getTableQueue(table),
632          () -> procedure + " released the shared lock");
633        waitingCount += wakeWaitingProcedures(tableLock);
634      }
635      if (namespaceLock.releaseSharedLock()) {
636        waitingCount += wakeWaitingProcedures(namespaceLock);
637      }
638      wakePollIfNeeded(waitingCount);
639    } finally {
640      schedUnlock();
641    }
642  }
643
644  /**
645   * Tries to remove the queue and the table-lock of the specified table. If there are new
646   * operations pending (e.g. a new create), the remove will not be performed.
647   * @param table     the name of the table that should be marked as deleted
648   * @param procedure the procedure that is removing the table
649   * @return true if deletion succeeded, false otherwise meaning that there are other new operations
650   *         pending for that table (e.g. a new create).
651   */
652  boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) {
653    schedLock();
654    try {
655      final TableQueue queue = getTableQueue(table);
656      final LockAndQueue tableLock = locking.getTableLock(table);
657      if (queue == null) return true;
658
659      if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) {
660        // remove the table from the run-queue and the map
661        if (AvlIterableList.isLinked(queue)) {
662          tableRunQueue.remove(queue);
663        }
664        removeTableQueue(table);
665      } else {
666        // TODO: If there are no create, we can drop all the other ops
667        return false;
668      }
669    } finally {
670      schedUnlock();
671    }
672    return true;
673  }
674
675  // ============================================================================
676  // Region Locking Helpers
677  // ============================================================================
678  /**
679   * Suspend the procedure if the specified region is already locked.
680   * @param procedure  the procedure trying to acquire the lock on the region
681   * @param regionInfo the region we are trying to lock
682   * @return true if the procedure has to wait for the regions to be available
683   */
684  public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
685    return waitRegions(procedure, regionInfo.getTable(), regionInfo);
686  }
687
688  /**
689   * Suspend the procedure if the specified set of regions are already locked.
690   * @param procedure   the procedure trying to acquire the lock on the regions
691   * @param table       the table name of the regions we are trying to lock
692   * @param regionInfos the list of regions we are trying to lock
693   * @return true if the procedure has to wait for the regions to be available
694   */
695  public boolean waitRegions(final Procedure<?> procedure, final TableName table,
696    final RegionInfo... regionInfos) {
697    Arrays.sort(regionInfos, RegionInfo.COMPARATOR);
698    schedLock();
699    try {
700      assert table != null;
701      if (waitTableSharedLock(procedure, table)) {
702        return true;
703      }
704
705      // acquire region xlocks or wait
706      boolean hasLock = true;
707      final LockAndQueue[] regionLocks = new LockAndQueue[regionInfos.length];
708      for (int i = 0; i < regionInfos.length; ++i) {
709        assert regionInfos[i] != null;
710        assert regionInfos[i].getTable() != null;
711        assert regionInfos[i].getTable().equals(table) : regionInfos[i] + " " + procedure;
712        assert i == 0 || regionInfos[i] != regionInfos[i - 1]
713          : "duplicate region: " + regionInfos[i];
714
715        regionLocks[i] = locking.getRegionLock(regionInfos[i].getEncodedName());
716        if (!regionLocks[i].tryExclusiveLock(procedure)) {
717          LOG.info("Waiting on xlock for {} held by pid={}", procedure,
718            regionLocks[i].getExclusiveLockProcIdOwner());
719          waitProcedure(regionLocks[i], procedure);
720          hasLock = false;
721          while (i-- > 0) {
722            regionLocks[i].releaseExclusiveLock(procedure);
723          }
724          break;
725        } else {
726          LOG.info("Took xlock for {}", procedure);
727        }
728      }
729
730      if (!hasLock) {
731        wakeTableSharedLock(procedure, table);
732      }
733      return !hasLock;
734    } finally {
735      schedUnlock();
736    }
737  }
738
739  /**
740   * Wake the procedures waiting for the specified region
741   * @param procedure  the procedure that was holding the region
742   * @param regionInfo the region the procedure was holding
743   */
744  public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
745    wakeRegions(procedure, regionInfo.getTable(), regionInfo);
746  }
747
748  /**
749   * Wake the procedures waiting for the specified regions
750   * @param procedure   the procedure that was holding the regions
751   * @param regionInfos the list of regions the procedure was holding
752   */
753  public void wakeRegions(final Procedure<?> procedure, final TableName table,
754    final RegionInfo... regionInfos) {
755    Arrays.sort(regionInfos, RegionInfo.COMPARATOR);
756    schedLock();
757    try {
758      int numProcs = 0;
759      final Procedure<?>[] nextProcs = new Procedure[regionInfos.length];
760      for (int i = 0; i < regionInfos.length; ++i) {
761        assert regionInfos[i].getTable().equals(table);
762        assert i == 0 || regionInfos[i] != regionInfos[i - 1]
763          : "duplicate region: " + regionInfos[i];
764
765        LockAndQueue regionLock = locking.getRegionLock(regionInfos[i].getEncodedName());
766        if (regionLock.releaseExclusiveLock(procedure)) {
767          if (!regionLock.isWaitingQueueEmpty()) {
768            // release one procedure at the time since regions has an xlock
769            nextProcs[numProcs++] = regionLock.removeFirst();
770          } else {
771            locking.removeRegionLock(regionInfos[i].getEncodedName());
772          }
773        }
774      }
775
776      // awake procedures if any
777      for (int i = numProcs - 1; i >= 0; --i) {
778        wakeProcedure(nextProcs[i]);
779      }
780      wakePollIfNeeded(numProcs);
781      // release the table shared-lock.
782      wakeTableSharedLock(procedure, table);
783    } finally {
784      schedUnlock();
785    }
786  }
787
788  // ============================================================================
789  // Namespace Locking Helpers
790  // ============================================================================
791  /**
792   * Suspend the procedure if the specified namespace is already locked.
793   * @see #wakeNamespaceExclusiveLock(Procedure,String)
794   * @param procedure the procedure trying to acquire the lock
795   * @param namespace Namespace to lock
796   * @return true if the procedure has to wait for the namespace to be available
797   */
798  public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) {
799    schedLock();
800    try {
801      final LockAndQueue systemNamespaceTableLock =
802        locking.getTableLock(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME);
803      if (!systemNamespaceTableLock.trySharedLock(procedure)) {
804        waitProcedure(systemNamespaceTableLock, procedure);
805        logLockedResource(LockedResourceType.TABLE,
806          TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME.getNameAsString());
807        return true;
808      }
809
810      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
811      if (!namespaceLock.tryExclusiveLock(procedure)) {
812        systemNamespaceTableLock.releaseSharedLock();
813        waitProcedure(namespaceLock, procedure);
814        logLockedResource(LockedResourceType.NAMESPACE, namespace);
815        return true;
816      }
817      return false;
818    } finally {
819      schedUnlock();
820    }
821  }
822
823  /**
824   * Wake the procedures waiting for the specified namespace
825   * @see #waitNamespaceExclusiveLock(Procedure,String)
826   * @param procedure the procedure releasing the lock
827   * @param namespace the namespace that has the exclusive lock
828   */
829  public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) {
830    schedLock();
831    try {
832      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
833      final LockAndQueue systemNamespaceTableLock =
834        locking.getTableLock(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME);
835      int waitingCount = 0;
836      if (namespaceLock.releaseExclusiveLock(procedure)) {
837        waitingCount += wakeWaitingProcedures(namespaceLock);
838      }
839      if (systemNamespaceTableLock.releaseSharedLock()) {
840        addToRunQueue(tableRunQueue,
841          getTableQueue(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME),
842          () -> procedure + " released namespace exclusive lock");
843        waitingCount += wakeWaitingProcedures(systemNamespaceTableLock);
844      }
845      wakePollIfNeeded(waitingCount);
846    } finally {
847      schedUnlock();
848    }
849  }
850
851  // ============================================================================
852  // Server Locking Helpers
853  // ============================================================================
854  /**
855   * Try to acquire the exclusive lock on the specified server.
856   * @see #wakeServerExclusiveLock(Procedure,ServerName)
857   * @param procedure  the procedure trying to acquire the lock
858   * @param serverName Server to lock
859   * @return true if the procedure has to wait for the server to be available
860   */
861  public boolean waitServerExclusiveLock(final Procedure<?> procedure,
862    final ServerName serverName) {
863    schedLock();
864    try {
865      final LockAndQueue lock = locking.getServerLock(serverName);
866      if (lock.tryExclusiveLock(procedure)) {
867        // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
868        // so.
869        removeFromRunQueue(serverRunQueue,
870          getServerQueue(serverName,
871            procedure instanceof ServerProcedureInterface
872              ? (ServerProcedureInterface) procedure
873              : null),
874          () -> procedure + " held exclusive lock");
875        return false;
876      }
877      waitProcedure(lock, procedure);
878      logLockedResource(LockedResourceType.SERVER, serverName.getServerName());
879      return true;
880    } finally {
881      schedUnlock();
882    }
883  }
884
885  /**
886   * Wake the procedures waiting for the specified server
887   * @see #waitServerExclusiveLock(Procedure,ServerName)
888   * @param procedure  the procedure releasing the lock
889   * @param serverName the server that has the exclusive lock
890   */
891  public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) {
892    schedLock();
893    try {
894      final LockAndQueue lock = locking.getServerLock(serverName);
895      // Only SCP will acquire/release server lock so do not need to check the return value here.
896      lock.releaseExclusiveLock(procedure);
897      // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
898      // so.
899      addToRunQueue(serverRunQueue,
900        getServerQueue(serverName,
901          procedure instanceof ServerProcedureInterface
902            ? (ServerProcedureInterface) procedure
903            : null),
904        () -> procedure + " released exclusive lock");
905      int waitingCount = wakeWaitingProcedures(lock);
906      wakePollIfNeeded(waitingCount);
907    } finally {
908      schedUnlock();
909    }
910  }
911
912  // ============================================================================
913  // Peer Locking Helpers
914  // ============================================================================
915  /**
916   * Try to acquire the exclusive lock on the specified peer.
917   * @see #wakePeerExclusiveLock(Procedure, String)
918   * @param procedure the procedure trying to acquire the lock
919   * @param peerId    peer to lock
920   * @return true if the procedure has to wait for the peer to be available
921   */
922  public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) {
923    schedLock();
924    try {
925      final LockAndQueue lock = locking.getPeerLock(peerId);
926      if (lock.tryExclusiveLock(procedure)) {
927        removeFromRunQueue(peerRunQueue, getPeerQueue(peerId),
928          () -> procedure + " held exclusive lock");
929        return false;
930      }
931      waitProcedure(lock, procedure);
932      logLockedResource(LockedResourceType.PEER, peerId);
933      return true;
934    } finally {
935      schedUnlock();
936    }
937  }
938
939  /**
940   * Wake the procedures waiting for the specified peer
941   * @see #waitPeerExclusiveLock(Procedure, String)
942   * @param procedure the procedure releasing the lock
943   * @param peerId    the peer that has the exclusive lock
944   */
945  public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) {
946    schedLock();
947    try {
948      final LockAndQueue lock = locking.getPeerLock(peerId);
949      if (lock.releaseExclusiveLock(procedure)) {
950        addToRunQueue(peerRunQueue, getPeerQueue(peerId),
951          () -> procedure + " released exclusive lock");
952        int waitingCount = wakeWaitingProcedures(lock);
953        wakePollIfNeeded(waitingCount);
954      }
955    } finally {
956      schedUnlock();
957    }
958  }
959
960  // ============================================================================
961  // Meta Locking Helpers
962  // ============================================================================
963  /**
964   * Try to acquire the exclusive lock on meta.
965   * @see #wakeMetaExclusiveLock(Procedure)
966   * @param procedure the procedure trying to acquire the lock
967   * @return true if the procedure has to wait for meta to be available
968   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
969   *             {@link RecoverMetaProcedure}.
970   */
971  @Deprecated
972  public boolean waitMetaExclusiveLock(Procedure<?> procedure) {
973    schedLock();
974    try {
975      final LockAndQueue lock = locking.getMetaLock();
976      if (lock.tryExclusiveLock(procedure)) {
977        removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock");
978        return false;
979      }
980      waitProcedure(lock, procedure);
981      logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString());
982      return true;
983    } finally {
984      schedUnlock();
985    }
986  }
987
988  /**
989   * Wake the procedures waiting for meta.
990   * @see #waitMetaExclusiveLock(Procedure)
991   * @param procedure the procedure releasing the lock
992   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
993   *             {@link RecoverMetaProcedure}.
994   */
995  @Deprecated
996  public void wakeMetaExclusiveLock(Procedure<?> procedure) {
997    schedLock();
998    try {
999      final LockAndQueue lock = locking.getMetaLock();
1000      lock.releaseExclusiveLock(procedure);
1001      addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock");
1002      int waitingCount = wakeWaitingProcedures(lock);
1003      wakePollIfNeeded(waitingCount);
1004    } finally {
1005      schedUnlock();
1006    }
1007  }
1008
1009  /**
1010   * For debugging. Expensive.
1011   */
1012  public String dumpLocks() throws IOException {
1013    schedLock();
1014    try {
1015      // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter
1016      return this.locking.toString();
1017    } finally {
1018      schedUnlock();
1019    }
1020  }
1021}