001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.function.Function;
024import java.util.function.Supplier;
025import org.apache.hadoop.hbase.ServerName;
026import org.apache.hadoop.hbase.TableExistsException;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.TableNotFoundException;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
031import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
032import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
033import org.apache.hadoop.hbase.procedure2.LockAndQueue;
034import org.apache.hadoop.hbase.procedure2.LockStatus;
035import org.apache.hadoop.hbase.procedure2.LockedResource;
036import org.apache.hadoop.hbase.procedure2.LockedResourceType;
037import org.apache.hadoop.hbase.procedure2.Procedure;
038import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
039import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
040import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
041import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * ProcedureScheduler for the Master Procedures. This ProcedureScheduler tries to provide to the
048 * ProcedureExecutor procedures that can be executed without having to wait on a lock. Most of the
049 * master operations can be executed concurrently, if they are operating on different tables (e.g.
050 * two create table procedures can be performed at the same time) or against two different servers;
051 * say two servers that crashed at about the same time.
052 * <p>
053 * Each procedure should implement an Interface providing information for this queue. For example
054 * table related procedures should implement TableProcedureInterface. Each procedure will be pushed
055 * in its own queue, and based on the operation type we may make smarter decisions: e.g. we can
056 * abort all the operations preceding a delete table, or similar.
057 * <h4>Concurrency control</h4> Concurrent access to member variables (tableRunQueue,
058 * serverRunQueue, locking, tableMap, serverBuckets) is controlled by schedLock(). This mainly
059 * includes:<br>
060 * <ul>
061 * <li>{@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue when:
062 * <ol>
063 * <li>Queue was empty before push (so must have been out of run-queue)</li>
064 * <li>Child procedure is added (which means parent procedure holds exclusive lock, and it must have
065 * moved Queue out of run-queue)</li>
066 * </ol>
067 * </li>
068 * <li>{@link #poll(long)}: A poll will remove a Queue from run-queue when:
069 * <ol>
070 * <li>Queue becomes empty after poll</li>
071 * <li>Exclusive lock is requested by polled procedure and lock is available (returns the
072 * procedure)</li>
073 * <li>Exclusive lock is requested but lock is not available (returns null)</li>
074 * <li>Polled procedure is child of parent holding exclusive lock and the next procedure is not a
075 * child</li>
076 * </ol>
077 * </li>
078 * <li>Namespace/table/region locks: Queue is added back to run-queue when lock being released is:
079 * <ol>
080 * <li>Exclusive lock</li>
081 * <li>Last shared lock (in case queue was removed because next procedure in queue required
082 * exclusive lock)</li>
083 * </ol>
084 * </li>
085 * </ul>
086 */
087@InterfaceAudience.Private
088public class MasterProcedureScheduler extends AbstractProcedureScheduler {
089  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class);
090
091  private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR =
092    (n, k) -> n.compareKey((ServerName) k);
093  private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR =
094    (n, k) -> n.compareKey((TableName) k);
095  private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR =
096    (n, k) -> n.compareKey((String) k);
097  private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
098    (n, k) -> n.compareKey((TableName) k);
099
100  private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
101  private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
102  private final FairQueue<String> peerRunQueue = new FairQueue<>();
103  private final FairQueue<TableName> metaRunQueue = new FairQueue<>();
104
105  private final ServerQueue[] serverBuckets = new ServerQueue[128];
106  private TableQueue tableMap = null;
107  private PeerQueue peerMap = null;
108  private MetaQueue metaMap = null;
109
110  private final SchemaLocking locking;
111
112  public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) {
113    locking = new SchemaLocking(procedureRetriever);
114  }
115
116  @Override
117  public void yield(final Procedure proc) {
118    push(proc, false, true);
119  }
120
121  @Override
122  protected void enqueue(final Procedure proc, final boolean addFront) {
123    if (isMetaProcedure(proc)) {
124      doAdd(metaRunQueue, getMetaQueue(), proc, addFront);
125    } else if (isTableProcedure(proc)) {
126      doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
127    } else if (isServerProcedure(proc)) {
128      ServerProcedureInterface spi = (ServerProcedureInterface) proc;
129      doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
130    } else if (isPeerProcedure(proc)) {
131      doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
132    } else {
133      // TODO: at the moment we only have Table and Server procedures
134      // if you are implementing a non-table/non-server procedure, you have two options: create
135      // a group for all the non-table/non-server procedures or try to find a key for your
136      // non-table/non-server procedures and implement something similar to the TableRunQueue.
137      throw new UnsupportedOperationException(
138        "RQs for non-table/non-server procedures are not implemented yet: " + proc);
139    }
140  }
141
142  private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,
143    Procedure<?> proc, boolean addFront) {
144    queue.add(proc, addFront);
145    // For the following conditions, we will put the queue back into execution
146    // 1. The procedure has already held the lock, or the lock has been restored when restarting,
147    // which means it can be executed immediately.
148    // 2. The exclusive lock for this queue has not been held.
149    // 3. The given procedure has the exclusive lock permission for this queue.
150    Supplier<String> reason = null;
151    if (proc.hasLock()) {
152      reason = () -> proc + " has lock";
153    } else if (proc.isLockedWhenLoading()) {
154      reason = () -> proc + " restores lock when restarting";
155    } else if (!queue.getLockStatus().hasExclusiveLock()) {
156      reason = () -> "the exclusive lock is not held by anyone when adding " + proc;
157    } else if (queue.getLockStatus().hasLockAccess(proc)) {
158      reason = () -> proc + " has the excusive lock access";
159    }
160    if (reason != null) {
161      addToRunQueue(fairq, queue, reason);
162    }
163  }
164
165  @Override
166  protected boolean queueHasRunnables() {
167    return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables()
168      || serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables();
169  }
170
171  @Override
172  protected Procedure dequeue() {
173    // meta procedure is always the first priority
174    Procedure<?> pollResult = doPoll(metaRunQueue);
175    // For now, let server handling have precedence over table handling; presumption is that it
176    // is more important handling crashed servers than it is running the
177    // enabling/disabling tables, etc.
178    if (pollResult == null) {
179      pollResult = doPoll(serverRunQueue);
180    }
181    if (pollResult == null) {
182      pollResult = doPoll(peerRunQueue);
183    }
184    if (pollResult == null) {
185      pollResult = doPoll(tableRunQueue);
186    }
187    return pollResult;
188  }
189
190  private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) {
191    LockStatus s = rq.getLockStatus();
192    // if we have the lock access, we are ready
193    if (s.hasLockAccess(proc)) {
194      return true;
195    }
196    boolean xlockReq = rq.requireExclusiveLock(proc);
197    // if we need to hold the xlock, then we need to make sure that no one holds any lock, including
198    // the shared lock, otherwise, we just need to make sure that no one holds the xlock
199    return xlockReq ? !s.isLocked() : !s.hasExclusiveLock();
200  }
201
202  private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) {
203    Queue<T> rq = fairq.poll();
204    if (rq == null || !rq.isAvailable()) {
205      return null;
206    }
207    // loop until we find out a procedure which is ready to run, or if we have checked all the
208    // procedures, then we give up and remove the queue from run queue.
209    for (int i = 0, n = rq.size(); i < n; i++) {
210      Procedure<?> proc = rq.poll();
211      if (isLockReady(proc, rq)) {
212        // the queue is empty, remove from run queue
213        if (rq.isEmpty()) {
214          removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc);
215        }
216        return proc;
217      }
218      // we are not ready to run, add back and try the next procedure
219      rq.add(proc, false);
220    }
221    // no procedure is ready for execution, remove from run queue
222    removeFromRunQueue(fairq, rq, () -> "no procedure can be executed");
223    return null;
224  }
225
226  @Override
227  public List<LockedResource> getLocks() {
228    schedLock();
229    try {
230      return locking.getLocks();
231    } finally {
232      schedUnlock();
233    }
234  }
235
236  @Override
237  public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
238    schedLock();
239    try {
240      return locking.getLockResource(resourceType, resourceName);
241    } finally {
242      schedUnlock();
243    }
244  }
245
246  @Override
247  public void clear() {
248    schedLock();
249    try {
250      clearQueue();
251      locking.clear();
252    } finally {
253      schedUnlock();
254    }
255  }
256
257  private void clearQueue() {
258    // Remove Servers
259    for (int i = 0; i < serverBuckets.length; ++i) {
260      clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
261      serverBuckets[i] = null;
262    }
263
264    // Remove Tables
265    clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
266    tableMap = null;
267
268    // Remove Peers
269    clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR);
270    peerMap = null;
271
272    assert size() == 0 : "expected queue size to be 0, got " + size();
273  }
274
275  private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap,
276    FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) {
277    while (treeMap != null) {
278      Queue<T> node = AvlTree.getFirst(treeMap);
279      treeMap = AvlTree.remove(treeMap, node.getKey(), comparator);
280      if (fairq != null) {
281        removeFromRunQueue(fairq, node, () -> "clear all queues");
282      }
283    }
284  }
285
286  private int queueSize(Queue<?> head) {
287    int count = 0;
288    AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head);
289    while (iter.hasNext()) {
290      count += iter.next().size();
291    }
292    return count;
293  }
294
295  @Override
296  protected int queueSize() {
297    int count = 0;
298    for (ServerQueue serverMap : serverBuckets) {
299      count += queueSize(serverMap);
300    }
301    count += queueSize(tableMap);
302    count += queueSize(peerMap);
303    count += queueSize(metaMap);
304    return count;
305  }
306
307  @Override
308  public void completionCleanup(final Procedure proc) {
309    if (proc instanceof TableProcedureInterface) {
310      TableProcedureInterface iProcTable = (TableProcedureInterface) proc;
311      boolean tableDeleted;
312      if (proc.hasException()) {
313        Exception procEx = proc.getException().unwrapRemoteException();
314        if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
315          // create failed because the table already exist
316          tableDeleted = !(procEx instanceof TableExistsException);
317        } else {
318          // the operation failed because the table does not exist
319          tableDeleted = (procEx instanceof TableNotFoundException);
320        }
321      } else {
322        // the table was deleted
323        tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
324      }
325      if (tableDeleted) {
326        markTableAsDeleted(iProcTable.getTableName(), proc);
327        return;
328      }
329    } else if (proc instanceof PeerProcedureInterface) {
330      tryCleanupPeerQueue(getPeerId(proc), proc);
331    } else if (proc instanceof ServerProcedureInterface) {
332      tryCleanupServerQueue(getServerName(proc), proc);
333    } else {
334      // No cleanup for other procedure types, yet.
335      return;
336    }
337  }
338
339  private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue,
340    Supplier<String> reason) {
341    if (LOG.isTraceEnabled()) {
342      LOG.trace("Add {} to run queue because: {}", queue, reason.get());
343    }
344    if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) {
345      fairq.add(queue);
346    }
347  }
348
349  private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq,
350    Queue<T> queue, Supplier<String> reason) {
351    if (LOG.isTraceEnabled()) {
352      LOG.trace("Remove {} from run queue because: {}", queue, reason.get());
353    }
354    if (AvlIterableList.isLinked(queue)) {
355      fairq.remove(queue);
356    }
357  }
358
359  // ============================================================================
360  // Table Queue Lookup Helpers
361  // ============================================================================
362  private TableQueue getTableQueue(TableName tableName) {
363    TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
364    if (node != null) return node;
365
366    node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName),
367      locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString()));
368    tableMap = AvlTree.insert(tableMap, node);
369    return node;
370  }
371
372  private void removeTableQueue(TableName tableName) {
373    tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
374    locking.removeTableLock(tableName);
375  }
376
377  private static boolean isTableProcedure(Procedure<?> proc) {
378    return proc instanceof TableProcedureInterface;
379  }
380
381  private static TableName getTableName(Procedure<?> proc) {
382    return ((TableProcedureInterface) proc).getTableName();
383  }
384
385  // ============================================================================
386  // Server Queue Lookup Helpers
387  // ============================================================================
388  private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) {
389    final int index = getBucketIndex(serverBuckets, serverName.hashCode());
390    ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
391    if (node != null) {
392      return node;
393    }
394    int priority;
395    if (proc != null) {
396      priority = MasterProcedureUtil.getServerPriority(proc);
397    } else {
398      priority = 1;
399    }
400    node = new ServerQueue(serverName, priority, locking.getServerLock(serverName));
401    serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
402    return node;
403  }
404
405  private void removeServerQueue(ServerName serverName) {
406    int index = getBucketIndex(serverBuckets, serverName.hashCode());
407    serverBuckets[index] =
408      AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
409    locking.removeServerLock(serverName);
410  }
411
412  private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
413    schedLock();
414    try {
415      int index = getBucketIndex(serverBuckets, serverName.hashCode());
416      ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
417      if (node == null) {
418        return;
419      }
420
421      LockAndQueue lock = locking.getServerLock(serverName);
422      if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
423        removeFromRunQueue(serverRunQueue, node,
424          () -> "clean up server queue after " + proc + " completed");
425        removeServerQueue(serverName);
426      }
427    } finally {
428      schedUnlock();
429    }
430  }
431
432  private static int getBucketIndex(Object[] buckets, int hashCode) {
433    return Math.abs(hashCode) % buckets.length;
434  }
435
436  private static boolean isServerProcedure(Procedure<?> proc) {
437    return proc instanceof ServerProcedureInterface;
438  }
439
440  private static ServerName getServerName(Procedure<?> proc) {
441    return ((ServerProcedureInterface) proc).getServerName();
442  }
443
444  // ============================================================================
445  // Peer Queue Lookup Helpers
446  // ============================================================================
447  private PeerQueue getPeerQueue(String peerId) {
448    PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
449    if (node != null) {
450      return node;
451    }
452    node = new PeerQueue(peerId, locking.getPeerLock(peerId));
453    peerMap = AvlTree.insert(peerMap, node);
454    return node;
455  }
456
457  private void removePeerQueue(String peerId) {
458    peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
459    locking.removePeerLock(peerId);
460  }
461
462  private void tryCleanupPeerQueue(String peerId, Procedure procedure) {
463    schedLock();
464    try {
465      PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
466      if (queue == null) {
467        return;
468      }
469
470      final LockAndQueue lock = locking.getPeerLock(peerId);
471      if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
472        removeFromRunQueue(peerRunQueue, queue,
473          () -> "clean up peer queue after " + procedure + " completed");
474        removePeerQueue(peerId);
475      }
476    } finally {
477      schedUnlock();
478    }
479  }
480
481  private static boolean isPeerProcedure(Procedure<?> proc) {
482    return proc instanceof PeerProcedureInterface;
483  }
484
485  private static String getPeerId(Procedure<?> proc) {
486    return ((PeerProcedureInterface) proc).getPeerId();
487  }
488
489  // ============================================================================
490  // Meta Queue Lookup Helpers
491  // ============================================================================
492  private MetaQueue getMetaQueue() {
493    MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR);
494    if (node != null) {
495      return node;
496    }
497    node = new MetaQueue(locking.getMetaLock());
498    metaMap = AvlTree.insert(metaMap, node);
499    return node;
500  }
501
502  private static boolean isMetaProcedure(Procedure<?> proc) {
503    return proc instanceof MetaProcedureInterface;
504  }
505
506  // ============================================================================
507  // Table Locking Helpers
508  // ============================================================================
509  /**
510   * Get lock info for a resource of specified type and name and log details
511   */
512  private void logLockedResource(LockedResourceType resourceType, String resourceName) {
513    if (!LOG.isDebugEnabled()) {
514      return;
515    }
516
517    LockedResource lockedResource = getLockResource(resourceType, resourceName);
518    if (lockedResource != null) {
519      String msg = resourceType.toString() + " '" + resourceName + "', shared lock count="
520        + lockedResource.getSharedLockCount();
521
522      Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure();
523      if (proc != null) {
524        msg += ", exclusively locked by procId=" + proc.getProcId();
525      }
526      LOG.debug(msg);
527    }
528  }
529
530  /**
531   * Suspend the procedure if the specified table is already locked. Other operations in the
532   * table-queue will be executed after the lock is released.
533   * @param procedure the procedure trying to acquire the lock
534   * @param table     Table to lock
535   * @return true if the procedure has to wait for the table to be available
536   */
537  public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
538    schedLock();
539    try {
540      final String namespace = table.getNamespaceAsString();
541      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
542      final LockAndQueue tableLock = locking.getTableLock(table);
543      if (!namespaceLock.trySharedLock(procedure)) {
544        waitProcedure(namespaceLock, procedure);
545        logLockedResource(LockedResourceType.NAMESPACE, namespace);
546        return true;
547      }
548      if (!tableLock.tryExclusiveLock(procedure)) {
549        namespaceLock.releaseSharedLock();
550        waitProcedure(tableLock, procedure);
551        logLockedResource(LockedResourceType.TABLE, table.getNameAsString());
552        return true;
553      }
554      removeFromRunQueue(tableRunQueue, getTableQueue(table),
555        () -> procedure + " held the exclusive lock");
556      return false;
557    } finally {
558      schedUnlock();
559    }
560  }
561
562  /**
563   * Wake the procedures waiting for the specified table
564   * @param procedure the procedure releasing the lock
565   * @param table     the name of the table that has the exclusive lock
566   */
567  public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
568    schedLock();
569    try {
570      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
571      final LockAndQueue tableLock = locking.getTableLock(table);
572      int waitingCount = 0;
573      if (tableLock.releaseExclusiveLock(procedure)) {
574        waitingCount += wakeWaitingProcedures(tableLock);
575      }
576      if (namespaceLock.releaseSharedLock()) {
577        waitingCount += wakeWaitingProcedures(namespaceLock);
578      }
579      addToRunQueue(tableRunQueue, getTableQueue(table),
580        () -> procedure + " released the exclusive lock");
581      wakePollIfNeeded(waitingCount);
582    } finally {
583      schedUnlock();
584    }
585  }
586
587  /**
588   * Suspend the procedure if the specified table is already locked. other "read" operations in the
589   * table-queue may be executed concurrently,
590   * @param procedure the procedure trying to acquire the lock
591   * @param table     Table to lock
592   * @return true if the procedure has to wait for the table to be available
593   */
594  public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) {
595    return waitTableQueueSharedLock(procedure, table) == null;
596  }
597
598  private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) {
599    schedLock();
600    try {
601      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
602      final LockAndQueue tableLock = locking.getTableLock(table);
603      if (!namespaceLock.trySharedLock(procedure)) {
604        waitProcedure(namespaceLock, procedure);
605        return null;
606      }
607
608      if (!tableLock.trySharedLock(procedure)) {
609        namespaceLock.releaseSharedLock();
610        waitProcedure(tableLock, procedure);
611        return null;
612      }
613
614      return getTableQueue(table);
615    } finally {
616      schedUnlock();
617    }
618  }
619
620  /**
621   * Wake the procedures waiting for the specified table
622   * @param procedure the procedure releasing the lock
623   * @param table     the name of the table that has the shared lock
624   */
625  public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) {
626    schedLock();
627    try {
628      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
629      final LockAndQueue tableLock = locking.getTableLock(table);
630      int waitingCount = 0;
631      if (tableLock.releaseSharedLock()) {
632        addToRunQueue(tableRunQueue, getTableQueue(table),
633          () -> procedure + " released the shared lock");
634        waitingCount += wakeWaitingProcedures(tableLock);
635      }
636      if (namespaceLock.releaseSharedLock()) {
637        waitingCount += wakeWaitingProcedures(namespaceLock);
638      }
639      wakePollIfNeeded(waitingCount);
640    } finally {
641      schedUnlock();
642    }
643  }
644
645  /**
646   * Tries to remove the queue and the table-lock of the specified table. If there are new
647   * operations pending (e.g. a new create), the remove will not be performed.
648   * @param table     the name of the table that should be marked as deleted
649   * @param procedure the procedure that is removing the table
650   * @return true if deletion succeeded, false otherwise meaning that there are other new operations
651   *         pending for that table (e.g. a new create).
652   */
653  boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) {
654    schedLock();
655    try {
656      final TableQueue queue = getTableQueue(table);
657      final LockAndQueue tableLock = locking.getTableLock(table);
658      if (queue == null) return true;
659
660      if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) {
661        // remove the table from the run-queue and the map
662        if (AvlIterableList.isLinked(queue)) {
663          tableRunQueue.remove(queue);
664        }
665        removeTableQueue(table);
666      } else {
667        // TODO: If there are no create, we can drop all the other ops
668        return false;
669      }
670    } finally {
671      schedUnlock();
672    }
673    return true;
674  }
675
676  // ============================================================================
677  // Region Locking Helpers
678  // ============================================================================
679  /**
680   * Suspend the procedure if the specified region is already locked.
681   * @param procedure  the procedure trying to acquire the lock on the region
682   * @param regionInfo the region we are trying to lock
683   * @return true if the procedure has to wait for the regions to be available
684   */
685  public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
686    return waitRegions(procedure, regionInfo.getTable(), regionInfo);
687  }
688
689  /**
690   * Suspend the procedure if the specified set of regions are already locked.
691   * @param procedure   the procedure trying to acquire the lock on the regions
692   * @param table       the table name of the regions we are trying to lock
693   * @param regionInfos the list of regions we are trying to lock
694   * @return true if the procedure has to wait for the regions to be available
695   */
696  public boolean waitRegions(final Procedure<?> procedure, final TableName table,
697    final RegionInfo... regionInfos) {
698    Arrays.sort(regionInfos, RegionInfo.COMPARATOR);
699    schedLock();
700    try {
701      assert table != null;
702      if (waitTableSharedLock(procedure, table)) {
703        return true;
704      }
705
706      // acquire region xlocks or wait
707      boolean hasLock = true;
708      final LockAndQueue[] regionLocks = new LockAndQueue[regionInfos.length];
709      for (int i = 0; i < regionInfos.length; ++i) {
710        assert regionInfos[i] != null;
711        assert regionInfos[i].getTable() != null;
712        assert regionInfos[i].getTable().equals(table) : regionInfos[i] + " " + procedure;
713        assert i == 0 || regionInfos[i] != regionInfos[i - 1]
714          : "duplicate region: " + regionInfos[i];
715
716        regionLocks[i] = locking.getRegionLock(regionInfos[i].getEncodedName());
717        if (!regionLocks[i].tryExclusiveLock(procedure)) {
718          LOG.info("Waiting on xlock for {} held by pid={}", procedure,
719            regionLocks[i].getExclusiveLockProcIdOwner());
720          waitProcedure(regionLocks[i], procedure);
721          hasLock = false;
722          while (i-- > 0) {
723            regionLocks[i].releaseExclusiveLock(procedure);
724          }
725          break;
726        } else {
727          LOG.info("Took xlock for {}", procedure);
728        }
729      }
730
731      if (!hasLock) {
732        wakeTableSharedLock(procedure, table);
733      }
734      return !hasLock;
735    } finally {
736      schedUnlock();
737    }
738  }
739
740  /**
741   * Wake the procedures waiting for the specified region
742   * @param procedure  the procedure that was holding the region
743   * @param regionInfo the region the procedure was holding
744   */
745  public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
746    wakeRegions(procedure, regionInfo.getTable(), regionInfo);
747  }
748
749  /**
750   * Wake the procedures waiting for the specified regions
751   * @param procedure   the procedure that was holding the regions
752   * @param regionInfos the list of regions the procedure was holding
753   */
754  public void wakeRegions(final Procedure<?> procedure, final TableName table,
755    final RegionInfo... regionInfos) {
756    Arrays.sort(regionInfos, RegionInfo.COMPARATOR);
757    schedLock();
758    try {
759      int numProcs = 0;
760      final Procedure<?>[] nextProcs = new Procedure[regionInfos.length];
761      for (int i = 0; i < regionInfos.length; ++i) {
762        assert regionInfos[i].getTable().equals(table);
763        assert i == 0 || regionInfos[i] != regionInfos[i - 1]
764          : "duplicate region: " + regionInfos[i];
765
766        LockAndQueue regionLock = locking.getRegionLock(regionInfos[i].getEncodedName());
767        if (regionLock.releaseExclusiveLock(procedure)) {
768          if (!regionLock.isWaitingQueueEmpty()) {
769            // release one procedure at the time since regions has an xlock
770            nextProcs[numProcs++] = regionLock.removeFirst();
771          } else {
772            locking.removeRegionLock(regionInfos[i].getEncodedName());
773          }
774        }
775      }
776
777      // awake procedures if any
778      for (int i = numProcs - 1; i >= 0; --i) {
779        wakeProcedure(nextProcs[i]);
780      }
781      wakePollIfNeeded(numProcs);
782      // release the table shared-lock.
783      wakeTableSharedLock(procedure, table);
784    } finally {
785      schedUnlock();
786    }
787  }
788
789  // ============================================================================
790  // Namespace Locking Helpers
791  // ============================================================================
792  /**
793   * Suspend the procedure if the specified namespace is already locked.
794   * @see #wakeNamespaceExclusiveLock(Procedure,String)
795   * @param procedure the procedure trying to acquire the lock
796   * @param namespace Namespace to lock
797   * @return true if the procedure has to wait for the namespace to be available
798   */
799  public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) {
800    schedLock();
801    try {
802      final LockAndQueue systemNamespaceTableLock =
803        locking.getTableLock(TableName.NAMESPACE_TABLE_NAME);
804      if (!systemNamespaceTableLock.trySharedLock(procedure)) {
805        waitProcedure(systemNamespaceTableLock, procedure);
806        logLockedResource(LockedResourceType.TABLE,
807          TableName.NAMESPACE_TABLE_NAME.getNameAsString());
808        return true;
809      }
810
811      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
812      if (!namespaceLock.tryExclusiveLock(procedure)) {
813        systemNamespaceTableLock.releaseSharedLock();
814        waitProcedure(namespaceLock, procedure);
815        logLockedResource(LockedResourceType.NAMESPACE, namespace);
816        return true;
817      }
818      return false;
819    } finally {
820      schedUnlock();
821    }
822  }
823
824  /**
825   * Wake the procedures waiting for the specified namespace
826   * @see #waitNamespaceExclusiveLock(Procedure,String)
827   * @param procedure the procedure releasing the lock
828   * @param namespace the namespace that has the exclusive lock
829   */
830  public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) {
831    schedLock();
832    try {
833      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
834      final LockAndQueue systemNamespaceTableLock =
835        locking.getTableLock(TableName.NAMESPACE_TABLE_NAME);
836      int waitingCount = 0;
837      if (namespaceLock.releaseExclusiveLock(procedure)) {
838        waitingCount += wakeWaitingProcedures(namespaceLock);
839      }
840      if (systemNamespaceTableLock.releaseSharedLock()) {
841        addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME),
842          () -> procedure + " released namespace exclusive lock");
843        waitingCount += wakeWaitingProcedures(systemNamespaceTableLock);
844      }
845      wakePollIfNeeded(waitingCount);
846    } finally {
847      schedUnlock();
848    }
849  }
850
851  // ============================================================================
852  // Server Locking Helpers
853  // ============================================================================
854  /**
855   * Try to acquire the exclusive lock on the specified server.
856   * @see #wakeServerExclusiveLock(Procedure,ServerName)
857   * @param procedure  the procedure trying to acquire the lock
858   * @param serverName Server to lock
859   * @return true if the procedure has to wait for the server to be available
860   */
861  public boolean waitServerExclusiveLock(final Procedure<?> procedure,
862    final ServerName serverName) {
863    schedLock();
864    try {
865      final LockAndQueue lock = locking.getServerLock(serverName);
866      if (lock.tryExclusiveLock(procedure)) {
867        // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
868        // so.
869        removeFromRunQueue(serverRunQueue,
870          getServerQueue(serverName,
871            procedure instanceof ServerProcedureInterface
872              ? (ServerProcedureInterface) procedure
873              : null),
874          () -> procedure + " held exclusive lock");
875        return false;
876      }
877      waitProcedure(lock, procedure);
878      logLockedResource(LockedResourceType.SERVER, serverName.getServerName());
879      return true;
880    } finally {
881      schedUnlock();
882    }
883  }
884
885  /**
886   * Wake the procedures waiting for the specified server
887   * @see #waitServerExclusiveLock(Procedure,ServerName)
888   * @param procedure  the procedure releasing the lock
889   * @param serverName the server that has the exclusive lock
890   */
891  public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) {
892    schedLock();
893    try {
894      final LockAndQueue lock = locking.getServerLock(serverName);
895      // Only SCP will acquire/release server lock so do not need to check the return value here.
896      lock.releaseExclusiveLock(procedure);
897      // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
898      // so.
899      addToRunQueue(serverRunQueue,
900        getServerQueue(serverName,
901          procedure instanceof ServerProcedureInterface
902            ? (ServerProcedureInterface) procedure
903            : null),
904        () -> procedure + " released exclusive lock");
905      int waitingCount = wakeWaitingProcedures(lock);
906      wakePollIfNeeded(waitingCount);
907    } finally {
908      schedUnlock();
909    }
910  }
911
912  // ============================================================================
913  // Peer Locking Helpers
914  // ============================================================================
915  private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) {
916    return proc.getPeerOperationType() != PeerOperationType.REFRESH;
917  }
918
919  /**
920   * Try to acquire the exclusive lock on the specified peer.
921   * @see #wakePeerExclusiveLock(Procedure, String)
922   * @param procedure the procedure trying to acquire the lock
923   * @param peerId    peer to lock
924   * @return true if the procedure has to wait for the peer to be available
925   */
926  public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) {
927    schedLock();
928    try {
929      final LockAndQueue lock = locking.getPeerLock(peerId);
930      if (lock.tryExclusiveLock(procedure)) {
931        removeFromRunQueue(peerRunQueue, getPeerQueue(peerId),
932          () -> procedure + " held exclusive lock");
933        return false;
934      }
935      waitProcedure(lock, procedure);
936      logLockedResource(LockedResourceType.PEER, peerId);
937      return true;
938    } finally {
939      schedUnlock();
940    }
941  }
942
943  /**
944   * Wake the procedures waiting for the specified peer
945   * @see #waitPeerExclusiveLock(Procedure, String)
946   * @param procedure the procedure releasing the lock
947   * @param peerId    the peer that has the exclusive lock
948   */
949  public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) {
950    schedLock();
951    try {
952      final LockAndQueue lock = locking.getPeerLock(peerId);
953      if (lock.releaseExclusiveLock(procedure)) {
954        addToRunQueue(peerRunQueue, getPeerQueue(peerId),
955          () -> procedure + " released exclusive lock");
956        int waitingCount = wakeWaitingProcedures(lock);
957        wakePollIfNeeded(waitingCount);
958      }
959    } finally {
960      schedUnlock();
961    }
962  }
963
964  // ============================================================================
965  // Meta Locking Helpers
966  // ============================================================================
967  /**
968   * Try to acquire the exclusive lock on meta.
969   * @see #wakeMetaExclusiveLock(Procedure)
970   * @param procedure the procedure trying to acquire the lock
971   * @return true if the procedure has to wait for meta to be available
972   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
973   *             {@link RecoverMetaProcedure}.
974   */
975  @Deprecated
976  public boolean waitMetaExclusiveLock(Procedure<?> procedure) {
977    schedLock();
978    try {
979      final LockAndQueue lock = locking.getMetaLock();
980      if (lock.tryExclusiveLock(procedure)) {
981        removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock");
982        return false;
983      }
984      waitProcedure(lock, procedure);
985      logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString());
986      return true;
987    } finally {
988      schedUnlock();
989    }
990  }
991
992  /**
993   * Wake the procedures waiting for meta.
994   * @see #waitMetaExclusiveLock(Procedure)
995   * @param procedure the procedure releasing the lock
996   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
997   *             {@link RecoverMetaProcedure}.
998   */
999  @Deprecated
1000  public void wakeMetaExclusiveLock(Procedure<?> procedure) {
1001    schedLock();
1002    try {
1003      final LockAndQueue lock = locking.getMetaLock();
1004      lock.releaseExclusiveLock(procedure);
1005      addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock");
1006      int waitingCount = wakeWaitingProcedures(lock);
1007      wakePollIfNeeded(waitingCount);
1008    } finally {
1009      schedUnlock();
1010    }
1011  }
1012
1013  /**
1014   * For debugging. Expensive.
1015   */
1016  public String dumpLocks() throws IOException {
1017    schedLock();
1018    try {
1019      // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter
1020      return this.locking.toString();
1021    } finally {
1022      schedUnlock();
1023    }
1024  }
1025}