001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import java.util.function.Function; 024import java.util.function.Supplier; 025import org.apache.hadoop.hbase.ServerName; 026import org.apache.hadoop.hbase.TableExistsException; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.TableNotFoundException; 029import org.apache.hadoop.hbase.client.RegionInfo; 030import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType; 031import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; 032import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; 033import org.apache.hadoop.hbase.procedure2.LockAndQueue; 034import org.apache.hadoop.hbase.procedure2.LockStatus; 035import org.apache.hadoop.hbase.procedure2.LockedResource; 036import org.apache.hadoop.hbase.procedure2.LockedResourceType; 037import org.apache.hadoop.hbase.procedure2.Procedure; 038import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; 039import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator; 040import org.apache.hadoop.hbase.util.AvlUtil.AvlTree; 041import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 047 048/** 049 * ProcedureScheduler for the Master Procedures. 050 * This ProcedureScheduler tries to provide to the ProcedureExecutor procedures 051 * that can be executed without having to wait on a lock. 052 * Most of the master operations can be executed concurrently, if they 053 * are operating on different tables (e.g. two create table procedures can be performed 054 * at the same time) or against two different servers; say two servers that crashed at 055 * about the same time. 056 * 057 * <p>Each procedure should implement an Interface providing information for this queue. 058 * For example table related procedures should implement TableProcedureInterface. 059 * Each procedure will be pushed in its own queue, and based on the operation type 060 * we may make smarter decisions: e.g. we can abort all the operations preceding 061 * a delete table, or similar. 062 * 063 * <h4>Concurrency control</h4> 064 * Concurrent access to member variables (tableRunQueue, serverRunQueue, locking, tableMap, 065 * serverBuckets) is controlled by schedLock(). This mainly includes:<br> 066 * <ul> 067 * <li> 068 * {@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue 069 * when: 070 * <ol> 071 * <li>Queue was empty before push (so must have been out of run-queue)</li> 072 * <li>Child procedure is added (which means parent procedure holds exclusive lock, and it 073 * must have moved Queue out of run-queue)</li> 074 * </ol> 075 * </li> 076 * <li> 077 * {@link #poll(long)}: A poll will remove a Queue from run-queue when: 078 * <ol> 079 * <li>Queue becomes empty after poll</li> 080 * <li>Exclusive lock is requested by polled procedure and lock is available (returns the 081 * procedure)</li> 082 * <li>Exclusive lock is requested but lock is not available (returns null)</li> 083 * <li>Polled procedure is child of parent holding exclusive lock and the next procedure is 084 * not a child</li> 085 * </ol> 086 * </li> 087 * <li> 088 * Namespace/table/region locks: Queue is added back to run-queue when lock being released is: 089 * <ol> 090 * <li>Exclusive lock</li> 091 * <li>Last shared lock (in case queue was removed because next procedure in queue required 092 * exclusive lock)</li> 093 * </ol> 094 * </li> 095 * </ul> 096 */ 097@InterfaceAudience.Private 098public class MasterProcedureScheduler extends AbstractProcedureScheduler { 099 private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class); 100 101 private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR = 102 (n, k) -> n.compareKey((ServerName) k); 103 private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR = 104 (n, k) -> n.compareKey((TableName) k); 105 private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR = 106 (n, k) -> n.compareKey((String) k); 107 private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR = 108 (n, k) -> n.compareKey((TableName) k); 109 110 private final FairQueue<ServerName> serverRunQueue = new FairQueue<>(); 111 private final FairQueue<TableName> tableRunQueue = new FairQueue<>(); 112 private final FairQueue<String> peerRunQueue = new FairQueue<>(); 113 private final FairQueue<TableName> metaRunQueue = new FairQueue<>(); 114 115 private final ServerQueue[] serverBuckets = new ServerQueue[128]; 116 private TableQueue tableMap = null; 117 private PeerQueue peerMap = null; 118 private MetaQueue metaMap = null; 119 120 private final SchemaLocking locking; 121 122 public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) { 123 locking = new SchemaLocking(procedureRetriever); 124 } 125 126 @Override 127 public void yield(final Procedure proc) { 128 push(proc, false, true); 129 } 130 131 @Override 132 protected void enqueue(final Procedure proc, final boolean addFront) { 133 if (isMetaProcedure(proc)) { 134 doAdd(metaRunQueue, getMetaQueue(), proc, addFront); 135 } else if (isTableProcedure(proc)) { 136 doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); 137 } else if (isServerProcedure(proc)) { 138 ServerProcedureInterface spi = (ServerProcedureInterface) proc; 139 doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront); 140 } else if (isPeerProcedure(proc)) { 141 doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront); 142 } else { 143 // TODO: at the moment we only have Table and Server procedures 144 // if you are implementing a non-table/non-server procedure, you have two options: create 145 // a group for all the non-table/non-server procedures or try to find a key for your 146 // non-table/non-server procedures and implement something similar to the TableRunQueue. 147 throw new UnsupportedOperationException( 148 "RQs for non-table/non-server procedures are not implemented yet: " + proc); 149 } 150 } 151 152 private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue, 153 Procedure<?> proc, boolean addFront) { 154 queue.add(proc, addFront); 155 // For the following conditions, we will put the queue back into execution 156 // 1. The procedure has already held the lock, or the lock has been restored when restarting, 157 // which means it can be executed immediately. 158 // 2. The exclusive lock for this queue has not been held. 159 // 3. The given procedure has the exclusive lock permission for this queue. 160 Supplier<String> reason = null; 161 if (proc.hasLock()) { 162 reason = () -> proc + " has lock"; 163 } else if (proc.isLockedWhenLoading()) { 164 reason = () -> proc + " restores lock when restarting"; 165 } else if (!queue.getLockStatus().hasExclusiveLock()) { 166 reason = () -> "the exclusive lock is not held by anyone when adding " + proc; 167 } else if (queue.getLockStatus().hasLockAccess(proc)) { 168 reason = () -> proc + " has the excusive lock access"; 169 } 170 if (reason != null) { 171 addToRunQueue(fairq, queue, reason); 172 } 173 } 174 175 @Override 176 protected boolean queueHasRunnables() { 177 return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() || 178 serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables(); 179 } 180 181 @Override 182 protected Procedure dequeue() { 183 // meta procedure is always the first priority 184 Procedure<?> pollResult = doPoll(metaRunQueue); 185 // For now, let server handling have precedence over table handling; presumption is that it 186 // is more important handling crashed servers than it is running the 187 // enabling/disabling tables, etc. 188 if (pollResult == null) { 189 pollResult = doPoll(serverRunQueue); 190 } 191 if (pollResult == null) { 192 pollResult = doPoll(peerRunQueue); 193 } 194 if (pollResult == null) { 195 pollResult = doPoll(tableRunQueue); 196 } 197 return pollResult; 198 } 199 200 private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) { 201 LockStatus s = rq.getLockStatus(); 202 // if we have the lock access, we are ready 203 if (s.hasLockAccess(proc)) { 204 return true; 205 } 206 boolean xlockReq = rq.requireExclusiveLock(proc); 207 // if we need to hold the xlock, then we need to make sure that no one holds any lock, including 208 // the shared lock, otherwise, we just need to make sure that no one holds the xlock 209 return xlockReq ? !s.isLocked() : !s.hasExclusiveLock(); 210 } 211 212 private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) { 213 Queue<T> rq = fairq.poll(); 214 if (rq == null || !rq.isAvailable()) { 215 return null; 216 } 217 // loop until we find out a procedure which is ready to run, or if we have checked all the 218 // procedures, then we give up and remove the queue from run queue. 219 for (int i = 0, n = rq.size(); i < n; i++) { 220 Procedure<?> proc = rq.poll(); 221 if (isLockReady(proc, rq)) { 222 // the queue is empty, remove from run queue 223 if (rq.isEmpty()) { 224 removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc); 225 } 226 return proc; 227 } 228 // we are not ready to run, add back and try the next procedure 229 rq.add(proc, false); 230 } 231 // no procedure is ready for execution, remove from run queue 232 removeFromRunQueue(fairq, rq, () -> "no procedure can be executed"); 233 return null; 234 } 235 236 @Override 237 public List<LockedResource> getLocks() { 238 schedLock(); 239 try { 240 return locking.getLocks(); 241 } finally { 242 schedUnlock(); 243 } 244 } 245 246 @Override 247 public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) { 248 schedLock(); 249 try { 250 return locking.getLockResource(resourceType, resourceName); 251 } finally { 252 schedUnlock(); 253 } 254 } 255 256 @Override 257 public void clear() { 258 schedLock(); 259 try { 260 clearQueue(); 261 locking.clear(); 262 } finally { 263 schedUnlock(); 264 } 265 } 266 267 private void clearQueue() { 268 // Remove Servers 269 for (int i = 0; i < serverBuckets.length; ++i) { 270 clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR); 271 serverBuckets[i] = null; 272 } 273 274 // Remove Tables 275 clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); 276 tableMap = null; 277 278 // Remove Peers 279 clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR); 280 peerMap = null; 281 282 assert size() == 0 : "expected queue size to be 0, got " + size(); 283 } 284 285 private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap, 286 FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) { 287 while (treeMap != null) { 288 Queue<T> node = AvlTree.getFirst(treeMap); 289 treeMap = AvlTree.remove(treeMap, node.getKey(), comparator); 290 if (fairq != null) { 291 removeFromRunQueue(fairq, node, () -> "clear all queues"); 292 } 293 } 294 } 295 296 private int queueSize(Queue<?> head) { 297 int count = 0; 298 AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head); 299 while (iter.hasNext()) { 300 count += iter.next().size(); 301 } 302 return count; 303 } 304 305 @Override 306 protected int queueSize() { 307 int count = 0; 308 for (ServerQueue serverMap : serverBuckets) { 309 count += queueSize(serverMap); 310 } 311 count += queueSize(tableMap); 312 count += queueSize(peerMap); 313 count += queueSize(metaMap); 314 return count; 315 } 316 317 @Override 318 public void completionCleanup(final Procedure proc) { 319 if (proc instanceof TableProcedureInterface) { 320 TableProcedureInterface iProcTable = (TableProcedureInterface) proc; 321 boolean tableDeleted; 322 if (proc.hasException()) { 323 Exception procEx = proc.getException().unwrapRemoteException(); 324 if (iProcTable.getTableOperationType() == TableOperationType.CREATE) { 325 // create failed because the table already exist 326 tableDeleted = !(procEx instanceof TableExistsException); 327 } else { 328 // the operation failed because the table does not exist 329 tableDeleted = (procEx instanceof TableNotFoundException); 330 } 331 } else { 332 // the table was deleted 333 tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE); 334 } 335 if (tableDeleted) { 336 markTableAsDeleted(iProcTable.getTableName(), proc); 337 return; 338 } 339 } else if (proc instanceof PeerProcedureInterface) { 340 tryCleanupPeerQueue(getPeerId(proc), proc); 341 } else if (proc instanceof ServerProcedureInterface) { 342 tryCleanupServerQueue(getServerName(proc), proc); 343 } else { 344 // No cleanup for other procedure types, yet. 345 return; 346 } 347 } 348 349 private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue, 350 Supplier<String> reason) { 351 if (LOG.isDebugEnabled()) { 352 LOG.debug("Add {} to run queue because: {}", queue, reason.get()); 353 } 354 if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) { 355 fairq.add(queue); 356 } 357 } 358 359 private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, 360 Queue<T> queue, Supplier<String> reason) { 361 if (LOG.isDebugEnabled()) { 362 LOG.debug("Remove {} from run queue because: {}", queue, reason.get()); 363 } 364 if (AvlIterableList.isLinked(queue)) { 365 fairq.remove(queue); 366 } 367 } 368 369 // ============================================================================ 370 // Table Queue Lookup Helpers 371 // ============================================================================ 372 private TableQueue getTableQueue(TableName tableName) { 373 TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); 374 if (node != null) return node; 375 376 node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName), 377 locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString())); 378 tableMap = AvlTree.insert(tableMap, node); 379 return node; 380 } 381 382 private void removeTableQueue(TableName tableName) { 383 tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); 384 locking.removeTableLock(tableName); 385 } 386 387 private static boolean isTableProcedure(Procedure<?> proc) { 388 return proc instanceof TableProcedureInterface; 389 } 390 391 private static TableName getTableName(Procedure<?> proc) { 392 return ((TableProcedureInterface)proc).getTableName(); 393 } 394 395 // ============================================================================ 396 // Server Queue Lookup Helpers 397 // ============================================================================ 398 private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) { 399 final int index = getBucketIndex(serverBuckets, serverName.hashCode()); 400 ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 401 if (node != null) { 402 return node; 403 } 404 int priority; 405 if (proc != null) { 406 priority = MasterProcedureUtil.getServerPriority(proc); 407 } else { 408 priority = 1; 409 } 410 node = new ServerQueue(serverName, priority, locking.getServerLock(serverName)); 411 serverBuckets[index] = AvlTree.insert(serverBuckets[index], node); 412 return node; 413 } 414 415 private void removeServerQueue(ServerName serverName) { 416 int index = getBucketIndex(serverBuckets, serverName.hashCode()); 417 serverBuckets[index] = 418 AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 419 locking.removeServerLock(serverName); 420 } 421 422 private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) { 423 schedLock(); 424 try { 425 int index = getBucketIndex(serverBuckets, serverName.hashCode()); 426 ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 427 if (node == null) { 428 return; 429 } 430 431 LockAndQueue lock = locking.getServerLock(serverName); 432 if (node.isEmpty() && lock.tryExclusiveLock(proc)) { 433 removeFromRunQueue(serverRunQueue, node, 434 () -> "clean up server queue after " + proc + " completed"); 435 removeServerQueue(serverName); 436 } 437 } finally { 438 schedUnlock(); 439 } 440 } 441 442 private static int getBucketIndex(Object[] buckets, int hashCode) { 443 return Math.abs(hashCode) % buckets.length; 444 } 445 446 private static boolean isServerProcedure(Procedure<?> proc) { 447 return proc instanceof ServerProcedureInterface; 448 } 449 450 private static ServerName getServerName(Procedure<?> proc) { 451 return ((ServerProcedureInterface)proc).getServerName(); 452 } 453 454 // ============================================================================ 455 // Peer Queue Lookup Helpers 456 // ============================================================================ 457 private PeerQueue getPeerQueue(String peerId) { 458 PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 459 if (node != null) { 460 return node; 461 } 462 node = new PeerQueue(peerId, locking.getPeerLock(peerId)); 463 peerMap = AvlTree.insert(peerMap, node); 464 return node; 465 } 466 467 private void removePeerQueue(String peerId) { 468 peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 469 locking.removePeerLock(peerId); 470 } 471 472 private void tryCleanupPeerQueue(String peerId, Procedure procedure) { 473 schedLock(); 474 try { 475 PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 476 if (queue == null) { 477 return; 478 } 479 480 final LockAndQueue lock = locking.getPeerLock(peerId); 481 if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) { 482 removeFromRunQueue(peerRunQueue, queue, 483 () -> "clean up peer queue after " + procedure + " completed"); 484 removePeerQueue(peerId); 485 } 486 } finally { 487 schedUnlock(); 488 } 489 } 490 491 private static boolean isPeerProcedure(Procedure<?> proc) { 492 return proc instanceof PeerProcedureInterface; 493 } 494 495 private static String getPeerId(Procedure<?> proc) { 496 return ((PeerProcedureInterface) proc).getPeerId(); 497 } 498 499 // ============================================================================ 500 // Meta Queue Lookup Helpers 501 // ============================================================================ 502 private MetaQueue getMetaQueue() { 503 MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR); 504 if (node != null) { 505 return node; 506 } 507 node = new MetaQueue(locking.getMetaLock()); 508 metaMap = AvlTree.insert(metaMap, node); 509 return node; 510 } 511 512 private static boolean isMetaProcedure(Procedure<?> proc) { 513 return proc instanceof MetaProcedureInterface; 514 } 515 // ============================================================================ 516 // Table Locking Helpers 517 // ============================================================================ 518 /** 519 * Get lock info for a resource of specified type and name and log details 520 */ 521 private void logLockedResource(LockedResourceType resourceType, String resourceName) { 522 if (!LOG.isDebugEnabled()) { 523 return; 524 } 525 526 LockedResource lockedResource = getLockResource(resourceType, resourceName); 527 if (lockedResource != null) { 528 String msg = resourceType.toString() + " '" + resourceName + "', shared lock count=" + 529 lockedResource.getSharedLockCount(); 530 531 Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure(); 532 if (proc != null) { 533 msg += ", exclusively locked by procId=" + proc.getProcId(); 534 } 535 LOG.debug(msg); 536 } 537 } 538 539 /** 540 * Suspend the procedure if the specified table is already locked. 541 * Other operations in the table-queue will be executed after the lock is released. 542 * @param procedure the procedure trying to acquire the lock 543 * @param table Table to lock 544 * @return true if the procedure has to wait for the table to be available 545 */ 546 public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) { 547 schedLock(); 548 try { 549 final String namespace = table.getNamespaceAsString(); 550 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 551 final LockAndQueue tableLock = locking.getTableLock(table); 552 if (!namespaceLock.trySharedLock(procedure)) { 553 waitProcedure(namespaceLock, procedure); 554 logLockedResource(LockedResourceType.NAMESPACE, namespace); 555 return true; 556 } 557 if (!tableLock.tryExclusiveLock(procedure)) { 558 namespaceLock.releaseSharedLock(); 559 waitProcedure(tableLock, procedure); 560 logLockedResource(LockedResourceType.TABLE, table.getNameAsString()); 561 return true; 562 } 563 removeFromRunQueue(tableRunQueue, getTableQueue(table), 564 () -> procedure + " held the exclusive lock"); 565 return false; 566 } finally { 567 schedUnlock(); 568 } 569 } 570 571 /** 572 * Wake the procedures waiting for the specified table 573 * @param procedure the procedure releasing the lock 574 * @param table the name of the table that has the exclusive lock 575 */ 576 public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) { 577 schedLock(); 578 try { 579 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 580 final LockAndQueue tableLock = locking.getTableLock(table); 581 int waitingCount = 0; 582 if (tableLock.releaseExclusiveLock(procedure)) { 583 waitingCount += wakeWaitingProcedures(tableLock); 584 } 585 if (namespaceLock.releaseSharedLock()) { 586 waitingCount += wakeWaitingProcedures(namespaceLock); 587 } 588 addToRunQueue(tableRunQueue, getTableQueue(table), 589 () -> procedure + " released the exclusive lock"); 590 wakePollIfNeeded(waitingCount); 591 } finally { 592 schedUnlock(); 593 } 594 } 595 596 /** 597 * Suspend the procedure if the specified table is already locked. 598 * other "read" operations in the table-queue may be executed concurrently, 599 * @param procedure the procedure trying to acquire the lock 600 * @param table Table to lock 601 * @return true if the procedure has to wait for the table to be available 602 */ 603 public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) { 604 return waitTableQueueSharedLock(procedure, table) == null; 605 } 606 607 private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) { 608 schedLock(); 609 try { 610 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 611 final LockAndQueue tableLock = locking.getTableLock(table); 612 if (!namespaceLock.trySharedLock(procedure)) { 613 waitProcedure(namespaceLock, procedure); 614 return null; 615 } 616 617 if (!tableLock.trySharedLock(procedure)) { 618 namespaceLock.releaseSharedLock(); 619 waitProcedure(tableLock, procedure); 620 return null; 621 } 622 623 return getTableQueue(table); 624 } finally { 625 schedUnlock(); 626 } 627 } 628 629 /** 630 * Wake the procedures waiting for the specified table 631 * @param procedure the procedure releasing the lock 632 * @param table the name of the table that has the shared lock 633 */ 634 public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) { 635 schedLock(); 636 try { 637 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 638 final LockAndQueue tableLock = locking.getTableLock(table); 639 int waitingCount = 0; 640 if (tableLock.releaseSharedLock()) { 641 addToRunQueue(tableRunQueue, getTableQueue(table), 642 () -> procedure + " released the shared lock"); 643 waitingCount += wakeWaitingProcedures(tableLock); 644 } 645 if (namespaceLock.releaseSharedLock()) { 646 waitingCount += wakeWaitingProcedures(namespaceLock); 647 } 648 wakePollIfNeeded(waitingCount); 649 } finally { 650 schedUnlock(); 651 } 652 } 653 654 /** 655 * Tries to remove the queue and the table-lock of the specified table. 656 * If there are new operations pending (e.g. a new create), 657 * the remove will not be performed. 658 * @param table the name of the table that should be marked as deleted 659 * @param procedure the procedure that is removing the table 660 * @return true if deletion succeeded, false otherwise meaning that there are 661 * other new operations pending for that table (e.g. a new create). 662 */ 663 @VisibleForTesting 664 boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) { 665 schedLock(); 666 try { 667 final TableQueue queue = getTableQueue(table); 668 final LockAndQueue tableLock = locking.getTableLock(table); 669 if (queue == null) return true; 670 671 if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) { 672 // remove the table from the run-queue and the map 673 if (AvlIterableList.isLinked(queue)) { 674 tableRunQueue.remove(queue); 675 } 676 removeTableQueue(table); 677 } else { 678 // TODO: If there are no create, we can drop all the other ops 679 return false; 680 } 681 } finally { 682 schedUnlock(); 683 } 684 return true; 685 } 686 687 // ============================================================================ 688 // Region Locking Helpers 689 // ============================================================================ 690 /** 691 * Suspend the procedure if the specified region is already locked. 692 * @param procedure the procedure trying to acquire the lock on the region 693 * @param regionInfo the region we are trying to lock 694 * @return true if the procedure has to wait for the regions to be available 695 */ 696 public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) { 697 return waitRegions(procedure, regionInfo.getTable(), regionInfo); 698 } 699 700 /** 701 * Suspend the procedure if the specified set of regions are already locked. 702 * @param procedure the procedure trying to acquire the lock on the regions 703 * @param table the table name of the regions we are trying to lock 704 * @param regionInfo the list of regions we are trying to lock 705 * @return true if the procedure has to wait for the regions to be available 706 */ 707 public boolean waitRegions(final Procedure<?> procedure, final TableName table, 708 final RegionInfo... regionInfo) { 709 Arrays.sort(regionInfo, RegionInfo.COMPARATOR); 710 schedLock(); 711 try { 712 assert table != null; 713 if (waitTableSharedLock(procedure, table)) { 714 return true; 715 } 716 717 // acquire region xlocks or wait 718 boolean hasLock = true; 719 final LockAndQueue[] regionLocks = new LockAndQueue[regionInfo.length]; 720 for (int i = 0; i < regionInfo.length; ++i) { 721 assert regionInfo[i] != null; 722 assert regionInfo[i].getTable() != null; 723 assert regionInfo[i].getTable().equals(table): regionInfo[i] + " " + procedure; 724 assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i]; 725 726 regionLocks[i] = locking.getRegionLock(regionInfo[i].getEncodedName()); 727 if (!regionLocks[i].tryExclusiveLock(procedure)) { 728 LOG.info("Waiting on xlock for {} held by pid={}", procedure, 729 regionLocks[i].getExclusiveLockProcIdOwner()); 730 waitProcedure(regionLocks[i], procedure); 731 hasLock = false; 732 while (i-- > 0) { 733 regionLocks[i].releaseExclusiveLock(procedure); 734 } 735 break; 736 } else { 737 LOG.info("Took xlock for {}", procedure); 738 } 739 } 740 741 if (!hasLock) { 742 wakeTableSharedLock(procedure, table); 743 } 744 return !hasLock; 745 } finally { 746 schedUnlock(); 747 } 748 } 749 750 /** 751 * Wake the procedures waiting for the specified region 752 * @param procedure the procedure that was holding the region 753 * @param regionInfo the region the procedure was holding 754 */ 755 public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) { 756 wakeRegions(procedure, regionInfo.getTable(), regionInfo); 757 } 758 759 /** 760 * Wake the procedures waiting for the specified regions 761 * @param procedure the procedure that was holding the regions 762 * @param regionInfo the list of regions the procedure was holding 763 */ 764 public void wakeRegions(final Procedure<?> procedure,final TableName table, 765 final RegionInfo... regionInfo) { 766 Arrays.sort(regionInfo, RegionInfo.COMPARATOR); 767 schedLock(); 768 try { 769 int numProcs = 0; 770 final Procedure<?>[] nextProcs = new Procedure[regionInfo.length]; 771 for (int i = 0; i < regionInfo.length; ++i) { 772 assert regionInfo[i].getTable().equals(table); 773 assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i]; 774 775 LockAndQueue regionLock = locking.getRegionLock(regionInfo[i].getEncodedName()); 776 if (regionLock.releaseExclusiveLock(procedure)) { 777 if (!regionLock.isWaitingQueueEmpty()) { 778 // release one procedure at the time since regions has an xlock 779 nextProcs[numProcs++] = regionLock.removeFirst(); 780 } else { 781 locking.removeRegionLock(regionInfo[i].getEncodedName()); 782 } 783 } 784 } 785 786 // awake procedures if any 787 for (int i = numProcs - 1; i >= 0; --i) { 788 wakeProcedure(nextProcs[i]); 789 } 790 wakePollIfNeeded(numProcs); 791 // release the table shared-lock. 792 wakeTableSharedLock(procedure, table); 793 } finally { 794 schedUnlock(); 795 } 796 } 797 798 // ============================================================================ 799 // Namespace Locking Helpers 800 // ============================================================================ 801 /** 802 * Suspend the procedure if the specified namespace is already locked. 803 * @see #wakeNamespaceExclusiveLock(Procedure,String) 804 * @param procedure the procedure trying to acquire the lock 805 * @param namespace Namespace to lock 806 * @return true if the procedure has to wait for the namespace to be available 807 */ 808 public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) { 809 schedLock(); 810 try { 811 final LockAndQueue systemNamespaceTableLock = 812 locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); 813 if (!systemNamespaceTableLock.trySharedLock(procedure)) { 814 waitProcedure(systemNamespaceTableLock, procedure); 815 logLockedResource(LockedResourceType.TABLE, 816 TableName.NAMESPACE_TABLE_NAME.getNameAsString()); 817 return true; 818 } 819 820 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 821 if (!namespaceLock.tryExclusiveLock(procedure)) { 822 systemNamespaceTableLock.releaseSharedLock(); 823 waitProcedure(namespaceLock, procedure); 824 logLockedResource(LockedResourceType.NAMESPACE, namespace); 825 return true; 826 } 827 return false; 828 } finally { 829 schedUnlock(); 830 } 831 } 832 833 /** 834 * Wake the procedures waiting for the specified namespace 835 * @see #waitNamespaceExclusiveLock(Procedure,String) 836 * @param procedure the procedure releasing the lock 837 * @param namespace the namespace that has the exclusive lock 838 */ 839 public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) { 840 schedLock(); 841 try { 842 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 843 final LockAndQueue systemNamespaceTableLock = 844 locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); 845 int waitingCount = 0; 846 if (namespaceLock.releaseExclusiveLock(procedure)) { 847 waitingCount += wakeWaitingProcedures(namespaceLock); 848 } 849 if (systemNamespaceTableLock.releaseSharedLock()) { 850 addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME), 851 () -> procedure + " released namespace exclusive lock"); 852 waitingCount += wakeWaitingProcedures(systemNamespaceTableLock); 853 } 854 wakePollIfNeeded(waitingCount); 855 } finally { 856 schedUnlock(); 857 } 858 } 859 860 // ============================================================================ 861 // Server Locking Helpers 862 // ============================================================================ 863 /** 864 * Try to acquire the exclusive lock on the specified server. 865 * @see #wakeServerExclusiveLock(Procedure,ServerName) 866 * @param procedure the procedure trying to acquire the lock 867 * @param serverName Server to lock 868 * @return true if the procedure has to wait for the server to be available 869 */ 870 public boolean waitServerExclusiveLock(final Procedure<?> procedure, 871 final ServerName serverName) { 872 schedLock(); 873 try { 874 final LockAndQueue lock = locking.getServerLock(serverName); 875 if (lock.tryExclusiveLock(procedure)) { 876 // In tests we may pass procedures other than ServerProcedureInterface, just pass null if 877 // so. 878 removeFromRunQueue(serverRunQueue, 879 getServerQueue(serverName, 880 procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure 881 : null), 882 () -> procedure + " held exclusive lock"); 883 return false; 884 } 885 waitProcedure(lock, procedure); 886 logLockedResource(LockedResourceType.SERVER, serverName.getServerName()); 887 return true; 888 } finally { 889 schedUnlock(); 890 } 891 } 892 893 /** 894 * Wake the procedures waiting for the specified server 895 * @see #waitServerExclusiveLock(Procedure,ServerName) 896 * @param procedure the procedure releasing the lock 897 * @param serverName the server that has the exclusive lock 898 */ 899 public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) { 900 schedLock(); 901 try { 902 final LockAndQueue lock = locking.getServerLock(serverName); 903 // Only SCP will acquire/release server lock so do not need to check the return value here. 904 lock.releaseExclusiveLock(procedure); 905 // In tests we may pass procedures other than ServerProcedureInterface, just pass null if 906 // so. 907 addToRunQueue(serverRunQueue, 908 getServerQueue(serverName, 909 procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure 910 : null), () -> procedure + " released exclusive lock"); 911 int waitingCount = wakeWaitingProcedures(lock); 912 wakePollIfNeeded(waitingCount); 913 } finally { 914 schedUnlock(); 915 } 916 } 917 918 // ============================================================================ 919 // Peer Locking Helpers 920 // ============================================================================ 921 private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) { 922 return proc.getPeerOperationType() != PeerOperationType.REFRESH; 923 } 924 925 /** 926 * Try to acquire the exclusive lock on the specified peer. 927 * @see #wakePeerExclusiveLock(Procedure, String) 928 * @param procedure the procedure trying to acquire the lock 929 * @param peerId peer to lock 930 * @return true if the procedure has to wait for the peer to be available 931 */ 932 public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) { 933 schedLock(); 934 try { 935 final LockAndQueue lock = locking.getPeerLock(peerId); 936 if (lock.tryExclusiveLock(procedure)) { 937 removeFromRunQueue(peerRunQueue, getPeerQueue(peerId), 938 () -> procedure + " held exclusive lock"); 939 return false; 940 } 941 waitProcedure(lock, procedure); 942 logLockedResource(LockedResourceType.PEER, peerId); 943 return true; 944 } finally { 945 schedUnlock(); 946 } 947 } 948 949 /** 950 * Wake the procedures waiting for the specified peer 951 * @see #waitPeerExclusiveLock(Procedure, String) 952 * @param procedure the procedure releasing the lock 953 * @param peerId the peer that has the exclusive lock 954 */ 955 public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) { 956 schedLock(); 957 try { 958 final LockAndQueue lock = locking.getPeerLock(peerId); 959 if (lock.releaseExclusiveLock(procedure)) { 960 addToRunQueue(peerRunQueue, getPeerQueue(peerId), 961 () -> procedure + " released exclusive lock"); 962 int waitingCount = wakeWaitingProcedures(lock); 963 wakePollIfNeeded(waitingCount); 964 } 965 } finally { 966 schedUnlock(); 967 } 968 } 969 970 // ============================================================================ 971 // Meta Locking Helpers 972 // ============================================================================ 973 /** 974 * Try to acquire the exclusive lock on meta. 975 * @see #wakeMetaExclusiveLock(Procedure) 976 * @param procedure the procedure trying to acquire the lock 977 * @return true if the procedure has to wait for meta to be available 978 * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with 979 * {@link RecoverMetaProcedure}. 980 */ 981 @Deprecated 982 public boolean waitMetaExclusiveLock(Procedure<?> procedure) { 983 schedLock(); 984 try { 985 final LockAndQueue lock = locking.getMetaLock(); 986 if (lock.tryExclusiveLock(procedure)) { 987 removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock"); 988 return false; 989 } 990 waitProcedure(lock, procedure); 991 logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString()); 992 return true; 993 } finally { 994 schedUnlock(); 995 } 996 } 997 998 /** 999 * Wake the procedures waiting for meta. 1000 * @see #waitMetaExclusiveLock(Procedure) 1001 * @param procedure the procedure releasing the lock 1002 * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with 1003 * {@link RecoverMetaProcedure}. 1004 */ 1005 @Deprecated 1006 public void wakeMetaExclusiveLock(Procedure<?> procedure) { 1007 schedLock(); 1008 try { 1009 final LockAndQueue lock = locking.getMetaLock(); 1010 lock.releaseExclusiveLock(procedure); 1011 addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock"); 1012 int waitingCount = wakeWaitingProcedures(lock); 1013 wakePollIfNeeded(waitingCount); 1014 } finally { 1015 schedUnlock(); 1016 } 1017 } 1018 1019 /** 1020 * For debugging. Expensive. 1021 */ 1022 @VisibleForTesting 1023 public String dumpLocks() throws IOException { 1024 schedLock(); 1025 try { 1026 // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter 1027 return this.locking.toString(); 1028 } finally { 1029 schedUnlock(); 1030 } 1031 } 1032}