001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.function.Function;
024import java.util.function.Supplier;
025import org.apache.hadoop.hbase.ServerName;
026import org.apache.hadoop.hbase.TableExistsException;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.TableNotFoundException;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
031import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
032import org.apache.hadoop.hbase.procedure2.LockAndQueue;
033import org.apache.hadoop.hbase.procedure2.LockStatus;
034import org.apache.hadoop.hbase.procedure2.LockedResource;
035import org.apache.hadoop.hbase.procedure2.LockedResourceType;
036import org.apache.hadoop.hbase.procedure2.Procedure;
037import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
038import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
039import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
040import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
046
047/**
048 * ProcedureScheduler for the Master Procedures.
049 * This ProcedureScheduler tries to provide to the ProcedureExecutor procedures
050 * that can be executed without having to wait on a lock.
051 * Most of the master operations can be executed concurrently, if they
052 * are operating on different tables (e.g. two create table procedures can be performed
053 * at the same time) or against two different servers; say two servers that crashed at
054 * about the same time.
055 *
056 * <p>Each procedure should implement an Interface providing information for this queue.
057 * For example table related procedures should implement TableProcedureInterface.
058 * Each procedure will be pushed in its own queue, and based on the operation type
059 * we may make smarter decisions: e.g. we can abort all the operations preceding
060 * a delete table, or similar.
061 *
062 * <h4>Concurrency control</h4>
063 * Concurrent access to member variables (tableRunQueue, serverRunQueue, locking, tableMap,
064 * serverBuckets) is controlled by schedLock(). This mainly includes:<br>
065 * <ul>
066 *   <li>
067 *     {@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue
068 *     when:
069 *     <ol>
070 *       <li>Queue was empty before push (so must have been out of run-queue)</li>
071 *       <li>Child procedure is added (which means parent procedure holds exclusive lock, and it
072 *           must have moved Queue out of run-queue)</li>
073 *     </ol>
074 *   </li>
075 *   <li>
076 *     {@link #poll(long)}: A poll will remove a Queue from run-queue when:
077 *     <ol>
078 *       <li>Queue becomes empty after poll</li>
079 *       <li>Exclusive lock is requested by polled procedure and lock is available (returns the
080 *           procedure)</li>
081 *       <li>Exclusive lock is requested but lock is not available (returns null)</li>
082 *       <li>Polled procedure is child of parent holding exclusive lock and the next procedure is
083 *           not a child</li>
084 *     </ol>
085 *   </li>
086 *   <li>
087 *     Namespace/table/region locks: Queue is added back to run-queue when lock being released is:
088 *     <ol>
089 *       <li>Exclusive lock</li>
090 *       <li>Last shared lock (in case queue was removed because next procedure in queue required
091 *           exclusive lock)</li>
092 *     </ol>
093 *   </li>
094 * </ul>
095 */
096@InterfaceAudience.Private
097public class MasterProcedureScheduler extends AbstractProcedureScheduler {
098  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class);
099
100  private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR =
101    (n, k) -> n.compareKey((ServerName) k);
102  private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR =
103    (n, k) -> n.compareKey((TableName) k);
104  private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
105    (n, k) -> n.compareKey((TableName) k);
106
107  private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
108  private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
109  private final FairQueue<TableName> metaRunQueue = new FairQueue<>();
110
111  private final ServerQueue[] serverBuckets = new ServerQueue[128];
112  private TableQueue tableMap = null;
113  private MetaQueue metaMap = null;
114
115  private final SchemaLocking locking;
116
117  public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) {
118    locking = new SchemaLocking(procedureRetriever);
119  }
120
121  @Override
122  public void yield(final Procedure proc) {
123    push(proc, false, true);
124  }
125
126  @Override
127  protected void enqueue(final Procedure proc, final boolean addFront) {
128    if (isMetaProcedure(proc) ||
129        (isTableProcedure(proc) && getTableName(proc).equals(TableName.META_TABLE_NAME))) {
130      doAdd(metaRunQueue, getMetaQueue(), proc, addFront);
131    } else if (isTableProcedure(proc)) {
132      doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
133    } else if (isServerProcedure(proc)) {
134      ServerProcedureInterface spi = (ServerProcedureInterface) proc;
135      doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
136    } else {
137      // TODO: at the moment we only have Table and Server procedures
138      // if you are implementing a non-table/non-server procedure, you have two options: create
139      // a group for all the non-table/non-server procedures or try to find a key for your
140      // non-table/non-server procedures and implement something similar to the TableRunQueue.
141      throw new UnsupportedOperationException(
142        "RQs for non-table/non-server procedures are not implemented yet: " + proc);
143    }
144  }
145
146  private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,
147      Procedure<?> proc, boolean addFront) {
148    queue.add(proc, addFront);
149    // For the following conditions, we will put the queue back into execution
150    // 1. The procedure has already held the lock, or the lock has been restored when restarting,
151    // which means it can be executed immediately.
152    // 2. The exclusive lock for this queue has not been held.
153    // 3. The given procedure has the exclusive lock permission for this queue.
154    Supplier<String> reason = null;
155    if (proc.hasLock()) {
156      reason = () -> proc + " has lock";
157    } else if (proc.isLockedWhenLoading()) {
158      reason = () -> proc + " restores lock when restarting";
159    } else if (!queue.getLockStatus().hasExclusiveLock()) {
160      reason = () -> "the exclusive lock is not held by anyone when adding " + proc;
161    } else if (queue.getLockStatus().hasLockAccess(proc)) {
162      reason = () -> proc + " has the excusive lock access";
163    }
164    if (reason != null) {
165      addToRunQueue(fairq, queue, reason);
166    }
167  }
168
169  @Override
170  protected boolean queueHasRunnables() {
171    return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() ||
172      serverRunQueue.hasRunnables();
173  }
174
175  @Override
176  protected Procedure dequeue(boolean onlyUrgent) {
177    // meta procedure is always the first priority
178    Procedure<?> pollResult = doPoll(metaRunQueue);
179    if (onlyUrgent) {
180      return pollResult;
181    }
182    // For now, let server handling have precedence over table handling; presumption is that it
183    // is more important handling crashed servers than it is running the
184    // enabling/disabling tables, etc.
185    if (pollResult == null) {
186      pollResult = doPoll(serverRunQueue);
187    }
188    if (pollResult == null) {
189      pollResult = doPoll(tableRunQueue);
190    }
191    return pollResult;
192  }
193
194  private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) {
195    LockStatus s = rq.getLockStatus();
196    // if we have the lock access, we are ready
197    if (s.hasLockAccess(proc)) {
198      return true;
199    }
200    boolean xlockReq = rq.requireExclusiveLock(proc);
201    // if we need to hold the xlock, then we need to make sure that no one holds any lock, including
202    // the shared lock, otherwise, we just need to make sure that no one holds the xlock
203    return xlockReq ? !s.isLocked() : !s.hasExclusiveLock();
204  }
205
206  private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) {
207    Queue<T> rq = fairq.poll();
208    if (rq == null || !rq.isAvailable()) {
209      return null;
210    }
211    // loop until we find out a procedure which is ready to run, or if we have checked all the
212    // procedures, then we give up and remove the queue from run queue.
213    for (int i = 0, n = rq.size(); i < n; i++) {
214      Procedure<?> proc = rq.poll();
215      if (isLockReady(proc, rq)) {
216        // the queue is empty, remove from run queue
217        if (rq.isEmpty()) {
218          removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc);
219        }
220        return proc;
221      }
222      // we are not ready to run, add back and try the next procedure
223      rq.add(proc, false);
224    }
225    // no procedure is ready for execution, remove from run queue
226    removeFromRunQueue(fairq, rq, () -> "no procedure can be executed");
227    return null;
228  }
229
230  @Override
231  public List<LockedResource> getLocks() {
232    schedLock();
233    try {
234      return locking.getLocks();
235    } finally {
236      schedUnlock();
237    }
238  }
239
240  @Override
241  public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
242    schedLock();
243    try {
244      return locking.getLockResource(resourceType, resourceName);
245    } finally {
246      schedUnlock();
247    }
248  }
249
250  @Override
251  public void clear() {
252    schedLock();
253    try {
254      clearQueue();
255      locking.clear();
256    } finally {
257      schedUnlock();
258    }
259  }
260
261  private void clearQueue() {
262    // Remove Servers
263    for (int i = 0; i < serverBuckets.length; ++i) {
264      clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
265      serverBuckets[i] = null;
266    }
267
268    // Remove Tables
269    clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
270    tableMap = null;
271
272    assert size() == 0 : "expected queue size to be 0, got " + size();
273  }
274
275  private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap,
276      FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) {
277    while (treeMap != null) {
278      Queue<T> node = AvlTree.getFirst(treeMap);
279      treeMap = AvlTree.remove(treeMap, node.getKey(), comparator);
280      if (fairq != null) {
281        removeFromRunQueue(fairq, node, () -> "clear all queues");
282      }
283    }
284  }
285
286  private int queueSize(Queue<?> head) {
287    int count = 0;
288    AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head);
289    while (iter.hasNext()) {
290      count += iter.next().size();
291    }
292    return count;
293  }
294
295  @Override
296  protected int queueSize() {
297    int count = 0;
298    for (ServerQueue serverMap : serverBuckets) {
299      count += queueSize(serverMap);
300    }
301    count += queueSize(tableMap);
302    count += queueSize(metaMap);
303    return count;
304  }
305
306  @Override
307  public void completionCleanup(final Procedure proc) {
308    if (proc instanceof TableProcedureInterface) {
309      TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
310      boolean tableDeleted;
311      if (proc.hasException()) {
312        Exception procEx = proc.getException().unwrapRemoteException();
313        if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
314          // create failed because the table already exist
315          tableDeleted = !(procEx instanceof TableExistsException);
316        } else {
317          // the operation failed because the table does not exist
318          tableDeleted = (procEx instanceof TableNotFoundException);
319        }
320      } else {
321        // the table was deleted
322        tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
323      }
324      if (tableDeleted) {
325        markTableAsDeleted(iProcTable.getTableName(), proc);
326        return;
327      }
328    } else if (proc instanceof ServerProcedureInterface) {
329      tryCleanupServerQueue(getServerName(proc), proc);
330    } else {
331      // No cleanup for other procedure types, yet.
332      return;
333    }
334  }
335
336  private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue,
337      Supplier<String> reason) {
338    if (LOG.isDebugEnabled()) {
339      LOG.debug("Add {} to run queue because: {}", queue, reason.get());
340    }
341    if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) {
342      fairq.add(queue);
343    }
344  }
345
346  private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq,
347      Queue<T> queue, Supplier<String> reason) {
348    if (LOG.isDebugEnabled()) {
349      LOG.debug("Remove {} from run queue because: {}", queue, reason.get());
350    }
351    if (AvlIterableList.isLinked(queue)) {
352      fairq.remove(queue);
353    }
354  }
355
356  // ============================================================================
357  //  Table Queue Lookup Helpers
358  // ============================================================================
359  private TableQueue getTableQueue(TableName tableName) {
360    TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
361    if (node != null) return node;
362
363    node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName),
364        locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString()));
365    tableMap = AvlTree.insert(tableMap, node);
366    return node;
367  }
368
369  private void removeTableQueue(TableName tableName) {
370    tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
371    locking.removeTableLock(tableName);
372  }
373
374
375  private static boolean isTableProcedure(Procedure proc) {
376    return proc instanceof TableProcedureInterface;
377  }
378
379  private static TableName getTableName(Procedure proc) {
380    return ((TableProcedureInterface)proc).getTableName();
381  }
382
383  // ============================================================================
384  //  Server Queue Lookup Helpers
385  // ============================================================================
386  private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) {
387    final int index = getBucketIndex(serverBuckets, serverName.hashCode());
388    ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
389    if (node != null) {
390      return node;
391    }
392    int priority;
393    if (proc != null) {
394      priority = MasterProcedureUtil.getServerPriority(proc);
395    } else {
396      priority = 1;
397    }
398    node = new ServerQueue(serverName, priority, locking.getServerLock(serverName));
399    serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
400    return node;
401  }
402
403  private void removeServerQueue(ServerName serverName) {
404    int index = getBucketIndex(serverBuckets, serverName.hashCode());
405    serverBuckets[index] =
406      AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
407    locking.removeServerLock(serverName);
408  }
409
410  private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
411    schedLock();
412    try {
413      int index = getBucketIndex(serverBuckets, serverName.hashCode());
414      ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
415      if (node == null) {
416        return;
417      }
418
419      LockAndQueue lock = locking.getServerLock(serverName);
420      if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
421        removeFromRunQueue(serverRunQueue, node,
422          () -> "clean up server queue after " + proc + " completed");
423        removeServerQueue(serverName);
424      }
425    } finally {
426      schedUnlock();
427    }
428  }
429
430  private static int getBucketIndex(Object[] buckets, int hashCode) {
431    return Math.abs(hashCode) % buckets.length;
432  }
433
434  private static boolean isServerProcedure(Procedure proc) {
435    return proc instanceof ServerProcedureInterface;
436  }
437
438  private static ServerName getServerName(Procedure proc) {
439    return ((ServerProcedureInterface)proc).getServerName();
440  }
441
442  // ============================================================================
443  //  Meta Queue Lookup Helpers
444  // ============================================================================
445  private MetaQueue getMetaQueue() {
446    MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR);
447    if (node != null) {
448      return node;
449    }
450    node = new MetaQueue(locking.getMetaLock());
451    metaMap = AvlTree.insert(metaMap, node);
452    return node;
453  }
454
455  private static boolean isMetaProcedure(Procedure<?> proc) {
456    return proc instanceof MetaProcedureInterface;
457  }
458
459  // ============================================================================
460  //  Table Locking Helpers
461  // ============================================================================
462  /**
463   * Get lock info for a resource of specified type and name and log details
464   */
465  private void logLockedResource(LockedResourceType resourceType, String resourceName) {
466    if (!LOG.isDebugEnabled()) {
467      return;
468    }
469
470    LockedResource lockedResource = getLockResource(resourceType, resourceName);
471    if (lockedResource != null) {
472      String msg = resourceType.toString() + " '" + resourceName + "', shared lock count=" +
473          lockedResource.getSharedLockCount();
474
475      Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure();
476      if (proc != null) {
477        msg += ", exclusively locked by procId=" + proc.getProcId();
478      }
479      LOG.debug(msg);
480    }
481  }
482
483  /**
484   * Suspend the procedure if the specified table is already locked.
485   * Other operations in the table-queue will be executed after the lock is released.
486   * @param procedure the procedure trying to acquire the lock
487   * @param table Table to lock
488   * @return true if the procedure has to wait for the table to be available
489   */
490  public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
491    schedLock();
492    try {
493      final String namespace = table.getNamespaceAsString();
494      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
495      final LockAndQueue tableLock = locking.getTableLock(table);
496      if (!namespaceLock.trySharedLock(procedure)) {
497        waitProcedure(namespaceLock, procedure);
498        logLockedResource(LockedResourceType.NAMESPACE, namespace);
499        return true;
500      }
501      if (!tableLock.tryExclusiveLock(procedure)) {
502        namespaceLock.releaseSharedLock();
503        waitProcedure(tableLock, procedure);
504        logLockedResource(LockedResourceType.TABLE, table.getNameAsString());
505        return true;
506      }
507      removeFromRunQueue(tableRunQueue, getTableQueue(table),
508        () -> procedure + " held the exclusive lock");
509      return false;
510    } finally {
511      schedUnlock();
512    }
513  }
514
515  /**
516   * Wake the procedures waiting for the specified table
517   * @param procedure the procedure releasing the lock
518   * @param table the name of the table that has the exclusive lock
519   */
520  public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
521    schedLock();
522    try {
523      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
524      final LockAndQueue tableLock = locking.getTableLock(table);
525      int waitingCount = 0;
526      if (tableLock.releaseExclusiveLock(procedure)) {
527        waitingCount += wakeWaitingProcedures(tableLock);
528      }
529      if (namespaceLock.releaseSharedLock()) {
530        waitingCount += wakeWaitingProcedures(namespaceLock);
531      }
532      addToRunQueue(tableRunQueue, getTableQueue(table),
533        () -> procedure + " released the exclusive lock");
534      wakePollIfNeeded(waitingCount);
535    } finally {
536      schedUnlock();
537    }
538  }
539
540  /**
541   * Suspend the procedure if the specified table is already locked.
542   * other "read" operations in the table-queue may be executed concurrently,
543   * @param procedure the procedure trying to acquire the lock
544   * @param table Table to lock
545   * @return true if the procedure has to wait for the table to be available
546   */
547  public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) {
548    return waitTableQueueSharedLock(procedure, table) == null;
549  }
550
551  private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) {
552    schedLock();
553    try {
554      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
555      final LockAndQueue tableLock = locking.getTableLock(table);
556      if (!namespaceLock.trySharedLock(procedure)) {
557        waitProcedure(namespaceLock, procedure);
558        return null;
559      }
560
561      if (!tableLock.trySharedLock(procedure)) {
562        namespaceLock.releaseSharedLock();
563        waitProcedure(tableLock, procedure);
564        return null;
565      }
566
567      return getTableQueue(table);
568    } finally {
569      schedUnlock();
570    }
571  }
572
573  /**
574   * Wake the procedures waiting for the specified table
575   * @param procedure the procedure releasing the lock
576   * @param table the name of the table that has the shared lock
577   */
578  public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) {
579    schedLock();
580    try {
581      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
582      final LockAndQueue tableLock = locking.getTableLock(table);
583      int waitingCount = 0;
584      if (tableLock.releaseSharedLock()) {
585        addToRunQueue(tableRunQueue, getTableQueue(table),
586          () -> procedure + " released the shared lock");
587        waitingCount += wakeWaitingProcedures(tableLock);
588      }
589      if (namespaceLock.releaseSharedLock()) {
590        waitingCount += wakeWaitingProcedures(namespaceLock);
591      }
592      wakePollIfNeeded(waitingCount);
593    } finally {
594      schedUnlock();
595    }
596  }
597
598  /**
599   * Tries to remove the queue and the table-lock of the specified table.
600   * If there are new operations pending (e.g. a new create),
601   * the remove will not be performed.
602   * @param table the name of the table that should be marked as deleted
603   * @param procedure the procedure that is removing the table
604   * @return true if deletion succeeded, false otherwise meaning that there are
605   *     other new operations pending for that table (e.g. a new create).
606   */
607  @VisibleForTesting
608  boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) {
609    schedLock();
610    try {
611      final TableQueue queue = getTableQueue(table);
612      final LockAndQueue tableLock = locking.getTableLock(table);
613      if (queue == null) return true;
614
615      if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) {
616        // remove the table from the run-queue and the map
617        if (AvlIterableList.isLinked(queue)) {
618          tableRunQueue.remove(queue);
619        }
620        removeTableQueue(table);
621      } else {
622        // TODO: If there are no create, we can drop all the other ops
623        return false;
624      }
625    } finally {
626      schedUnlock();
627    }
628    return true;
629  }
630
631  // ============================================================================
632  //  Region Locking Helpers
633  // ============================================================================
634  /**
635   * Suspend the procedure if the specified region is already locked.
636   * @param procedure the procedure trying to acquire the lock on the region
637   * @param regionInfo the region we are trying to lock
638   * @return true if the procedure has to wait for the regions to be available
639   */
640  public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
641    return waitRegions(procedure, regionInfo.getTable(), regionInfo);
642  }
643
644  /**
645   * Suspend the procedure if the specified set of regions are already locked.
646   * @param procedure the procedure trying to acquire the lock on the regions
647   * @param table the table name of the regions we are trying to lock
648   * @param regionInfo the list of regions we are trying to lock
649   * @return true if the procedure has to wait for the regions to be available
650   */
651  public boolean waitRegions(final Procedure<?> procedure, final TableName table,
652      final RegionInfo... regionInfo) {
653    Arrays.sort(regionInfo, RegionInfo.COMPARATOR);
654    schedLock();
655    try {
656      assert table != null;
657      if (waitTableSharedLock(procedure, table)) {
658        return true;
659      }
660
661      // acquire region xlocks or wait
662      boolean hasLock = true;
663      final LockAndQueue[] regionLocks = new LockAndQueue[regionInfo.length];
664      for (int i = 0; i < regionInfo.length; ++i) {
665        assert regionInfo[i] != null;
666        assert regionInfo[i].getTable() != null;
667        assert regionInfo[i].getTable().equals(table): regionInfo[i] + " " + procedure;
668        assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i];
669
670        regionLocks[i] = locking.getRegionLock(regionInfo[i].getEncodedName());
671        if (!regionLocks[i].tryExclusiveLock(procedure)) {
672          LOG.info("Waiting on xlock for {} held by pid={}", procedure,
673              regionLocks[i].getExclusiveLockProcIdOwner());
674          waitProcedure(regionLocks[i], procedure);
675          hasLock = false;
676          while (i-- > 0) {
677            regionLocks[i].releaseExclusiveLock(procedure);
678          }
679          break;
680        } else {
681          LOG.info("Took xlock for {}", procedure);
682        }
683      }
684
685      if (!hasLock) {
686        wakeTableSharedLock(procedure, table);
687      }
688      return !hasLock;
689    } finally {
690      schedUnlock();
691    }
692  }
693
694  /**
695   * Wake the procedures waiting for the specified region
696   * @param procedure the procedure that was holding the region
697   * @param regionInfo the region the procedure was holding
698   */
699  public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
700    wakeRegions(procedure, regionInfo.getTable(), regionInfo);
701  }
702
703  /**
704   * Wake the procedures waiting for the specified regions
705   * @param procedure the procedure that was holding the regions
706   * @param regionInfo the list of regions the procedure was holding
707   */
708  public void wakeRegions(final Procedure<?> procedure,final TableName table,
709      final RegionInfo... regionInfo) {
710    Arrays.sort(regionInfo, RegionInfo.COMPARATOR);
711    schedLock();
712    try {
713      int numProcs = 0;
714      final Procedure<?>[] nextProcs = new Procedure[regionInfo.length];
715      for (int i = 0; i < regionInfo.length; ++i) {
716        assert regionInfo[i].getTable().equals(table);
717        assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i];
718
719        LockAndQueue regionLock = locking.getRegionLock(regionInfo[i].getEncodedName());
720        if (regionLock.releaseExclusiveLock(procedure)) {
721          if (!regionLock.isWaitingQueueEmpty()) {
722            // release one procedure at the time since regions has an xlock
723            nextProcs[numProcs++] = regionLock.removeFirst();
724          } else {
725            locking.removeRegionLock(regionInfo[i].getEncodedName());
726          }
727        }
728      }
729
730      // awake procedures if any
731      for (int i = numProcs - 1; i >= 0; --i) {
732        wakeProcedure(nextProcs[i]);
733      }
734      wakePollIfNeeded(numProcs);
735      // release the table shared-lock.
736      wakeTableSharedLock(procedure, table);
737    } finally {
738      schedUnlock();
739    }
740  }
741
742  // ============================================================================
743  //  Namespace Locking Helpers
744  // ============================================================================
745  /**
746   * Suspend the procedure if the specified namespace is already locked.
747   * @see #wakeNamespaceExclusiveLock(Procedure,String)
748   * @param procedure the procedure trying to acquire the lock
749   * @param namespace Namespace to lock
750   * @return true if the procedure has to wait for the namespace to be available
751   */
752  public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) {
753    schedLock();
754    try {
755      final LockAndQueue systemNamespaceTableLock =
756        locking.getTableLock(TableName.NAMESPACE_TABLE_NAME);
757      if (!systemNamespaceTableLock.trySharedLock(procedure)) {
758        waitProcedure(systemNamespaceTableLock, procedure);
759        logLockedResource(LockedResourceType.TABLE,
760          TableName.NAMESPACE_TABLE_NAME.getNameAsString());
761        return true;
762      }
763
764      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
765      if (!namespaceLock.tryExclusiveLock(procedure)) {
766        systemNamespaceTableLock.releaseSharedLock();
767        waitProcedure(namespaceLock, procedure);
768        logLockedResource(LockedResourceType.NAMESPACE, namespace);
769        return true;
770      }
771      return false;
772    } finally {
773      schedUnlock();
774    }
775  }
776
777  /**
778   * Wake the procedures waiting for the specified namespace
779   * @see #waitNamespaceExclusiveLock(Procedure,String)
780   * @param procedure the procedure releasing the lock
781   * @param namespace the namespace that has the exclusive lock
782   */
783  public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) {
784    schedLock();
785    try {
786      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
787      final LockAndQueue systemNamespaceTableLock =
788          locking.getTableLock(TableName.NAMESPACE_TABLE_NAME);
789      int waitingCount = 0;
790      if (namespaceLock.releaseExclusiveLock(procedure)) {
791        waitingCount += wakeWaitingProcedures(namespaceLock);
792      }
793      if (systemNamespaceTableLock.releaseSharedLock()) {
794        addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME),
795          () -> procedure + " released namespace exclusive lock");
796        waitingCount += wakeWaitingProcedures(systemNamespaceTableLock);
797      }
798      wakePollIfNeeded(waitingCount);
799    } finally {
800      schedUnlock();
801    }
802  }
803
804  // ============================================================================
805  //  Server Locking Helpers
806  // ============================================================================
807  /**
808   * Try to acquire the exclusive lock on the specified server.
809   * @see #wakeServerExclusiveLock(Procedure,ServerName)
810   * @param procedure the procedure trying to acquire the lock
811   * @param serverName Server to lock
812   * @return true if the procedure has to wait for the server to be available
813   */
814  public boolean waitServerExclusiveLock(final Procedure procedure, final ServerName serverName) {
815    schedLock();
816    try {
817      final LockAndQueue lock = locking.getServerLock(serverName);
818      if (lock.tryExclusiveLock(procedure)) {
819        // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
820        // so.
821        removeFromRunQueue(serverRunQueue,
822          getServerQueue(serverName,
823            procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
824              : null),
825          () -> procedure + " held exclusive lock");
826        return false;
827      }
828      waitProcedure(lock, procedure);
829      logLockedResource(LockedResourceType.SERVER, serverName.getServerName());
830      return true;
831    } finally {
832      schedUnlock();
833    }
834  }
835
836  /**
837   * Wake the procedures waiting for the specified server
838   * @see #waitServerExclusiveLock(Procedure,ServerName)
839   * @param procedure the procedure releasing the lock
840   * @param serverName the server that has the exclusive lock
841   */
842  public void wakeServerExclusiveLock(final Procedure procedure, final ServerName serverName) {
843    schedLock();
844    try {
845      final LockAndQueue lock = locking.getServerLock(serverName);
846      // Only SCP will acquire/release server lock so do not need to check the return value here.
847      lock.releaseExclusiveLock(procedure);
848      // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
849      // so.
850      addToRunQueue(serverRunQueue,
851        getServerQueue(serverName,
852          procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
853            : null), () -> procedure + " released exclusive lock");
854      int waitingCount = wakeWaitingProcedures(lock);
855      wakePollIfNeeded(waitingCount);
856    } finally {
857      schedUnlock();
858    }
859  }
860
861  // ============================================================================
862  // Meta Locking Helpers
863  // ============================================================================
864  /**
865   * Try to acquire the exclusive lock on meta.
866   * @see #wakeMetaExclusiveLock(Procedure)
867   * @param procedure the procedure trying to acquire the lock
868   * @return true if the procedure has to wait for meta to be available
869   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
870   *             {@link RecoverMetaProcedure}.
871   */
872  @Deprecated
873  public boolean waitMetaExclusiveLock(Procedure<?> procedure) {
874    schedLock();
875    try {
876      final LockAndQueue lock = locking.getMetaLock();
877      if (lock.tryExclusiveLock(procedure)) {
878        removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock");
879        return false;
880      }
881      waitProcedure(lock, procedure);
882      logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString());
883      return true;
884    } finally {
885      schedUnlock();
886    }
887  }
888
889  /**
890   * Wake the procedures waiting for meta.
891   * @see #waitMetaExclusiveLock(Procedure)
892   * @param procedure the procedure releasing the lock
893   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
894   *             {@link RecoverMetaProcedure}.
895   */
896  @Deprecated
897  public void wakeMetaExclusiveLock(Procedure<?> procedure) {
898    schedLock();
899    try {
900      final LockAndQueue lock = locking.getMetaLock();
901      lock.releaseExclusiveLock(procedure);
902      addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock");
903      int waitingCount = wakeWaitingProcedures(lock);
904      wakePollIfNeeded(waitingCount);
905    } finally {
906      schedUnlock();
907    }
908  }
909
910  /**
911   * For debugging. Expensive.
912   */
913  @VisibleForTesting
914  public String dumpLocks() throws IOException {
915    schedLock();
916    try {
917      // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter
918      return this.locking.toString();
919    } finally {
920      schedUnlock();
921    }
922  }
923}