001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import java.util.function.Function; 024import java.util.function.Supplier; 025import org.apache.hadoop.hbase.ServerName; 026import org.apache.hadoop.hbase.TableExistsException; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.TableNotFoundException; 029import org.apache.hadoop.hbase.client.RegionInfo; 030import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; 031import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; 032import org.apache.hadoop.hbase.procedure2.LockAndQueue; 033import org.apache.hadoop.hbase.procedure2.LockStatus; 034import org.apache.hadoop.hbase.procedure2.LockedResource; 035import org.apache.hadoop.hbase.procedure2.LockedResourceType; 036import org.apache.hadoop.hbase.procedure2.Procedure; 037import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; 038import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator; 039import org.apache.hadoop.hbase.util.AvlUtil.AvlTree; 040import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 046 047/** 048 * ProcedureScheduler for the Master Procedures. 049 * This ProcedureScheduler tries to provide to the ProcedureExecutor procedures 050 * that can be executed without having to wait on a lock. 051 * Most of the master operations can be executed concurrently, if they 052 * are operating on different tables (e.g. two create table procedures can be performed 053 * at the same time) or against two different servers; say two servers that crashed at 054 * about the same time. 055 * 056 * <p>Each procedure should implement an Interface providing information for this queue. 057 * For example table related procedures should implement TableProcedureInterface. 058 * Each procedure will be pushed in its own queue, and based on the operation type 059 * we may make smarter decisions: e.g. we can abort all the operations preceding 060 * a delete table, or similar. 061 * 062 * <h4>Concurrency control</h4> 063 * Concurrent access to member variables (tableRunQueue, serverRunQueue, locking, tableMap, 064 * serverBuckets) is controlled by schedLock(). This mainly includes:<br> 065 * <ul> 066 * <li> 067 * {@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue 068 * when: 069 * <ol> 070 * <li>Queue was empty before push (so must have been out of run-queue)</li> 071 * <li>Child procedure is added (which means parent procedure holds exclusive lock, and it 072 * must have moved Queue out of run-queue)</li> 073 * </ol> 074 * </li> 075 * <li> 076 * {@link #poll(long)}: A poll will remove a Queue from run-queue when: 077 * <ol> 078 * <li>Queue becomes empty after poll</li> 079 * <li>Exclusive lock is requested by polled procedure and lock is available (returns the 080 * procedure)</li> 081 * <li>Exclusive lock is requested but lock is not available (returns null)</li> 082 * <li>Polled procedure is child of parent holding exclusive lock and the next procedure is 083 * not a child</li> 084 * </ol> 085 * </li> 086 * <li> 087 * Namespace/table/region locks: Queue is added back to run-queue when lock being released is: 088 * <ol> 089 * <li>Exclusive lock</li> 090 * <li>Last shared lock (in case queue was removed because next procedure in queue required 091 * exclusive lock)</li> 092 * </ol> 093 * </li> 094 * </ul> 095 */ 096@InterfaceAudience.Private 097public class MasterProcedureScheduler extends AbstractProcedureScheduler { 098 private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class); 099 100 private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR = 101 (n, k) -> n.compareKey((ServerName) k); 102 private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR = 103 (n, k) -> n.compareKey((TableName) k); 104 private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR = 105 (n, k) -> n.compareKey((TableName) k); 106 107 private final FairQueue<ServerName> serverRunQueue = new FairQueue<>(); 108 private final FairQueue<TableName> tableRunQueue = new FairQueue<>(); 109 private final FairQueue<TableName> metaRunQueue = new FairQueue<>(); 110 111 private final ServerQueue[] serverBuckets = new ServerQueue[128]; 112 private TableQueue tableMap = null; 113 private MetaQueue metaMap = null; 114 115 private final SchemaLocking locking; 116 117 public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) { 118 locking = new SchemaLocking(procedureRetriever); 119 } 120 121 @Override 122 public void yield(final Procedure proc) { 123 push(proc, false, true); 124 } 125 126 @Override 127 protected void enqueue(final Procedure proc, final boolean addFront) { 128 if (isMetaProcedure(proc) || 129 (isTableProcedure(proc) && getTableName(proc).equals(TableName.META_TABLE_NAME))) { 130 doAdd(metaRunQueue, getMetaQueue(), proc, addFront); 131 } else if (isTableProcedure(proc)) { 132 doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); 133 } else if (isServerProcedure(proc)) { 134 ServerProcedureInterface spi = (ServerProcedureInterface) proc; 135 doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront); 136 } else { 137 // TODO: at the moment we only have Table and Server procedures 138 // if you are implementing a non-table/non-server procedure, you have two options: create 139 // a group for all the non-table/non-server procedures or try to find a key for your 140 // non-table/non-server procedures and implement something similar to the TableRunQueue. 141 throw new UnsupportedOperationException( 142 "RQs for non-table/non-server procedures are not implemented yet: " + proc); 143 } 144 } 145 146 private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue, 147 Procedure<?> proc, boolean addFront) { 148 queue.add(proc, addFront); 149 // For the following conditions, we will put the queue back into execution 150 // 1. The procedure has already held the lock, or the lock has been restored when restarting, 151 // which means it can be executed immediately. 152 // 2. The exclusive lock for this queue has not been held. 153 // 3. The given procedure has the exclusive lock permission for this queue. 154 Supplier<String> reason = null; 155 if (proc.hasLock()) { 156 reason = () -> proc + " has lock"; 157 } else if (proc.isLockedWhenLoading()) { 158 reason = () -> proc + " restores lock when restarting"; 159 } else if (!queue.getLockStatus().hasExclusiveLock()) { 160 reason = () -> "the exclusive lock is not held by anyone when adding " + proc; 161 } else if (queue.getLockStatus().hasLockAccess(proc)) { 162 reason = () -> proc + " has the excusive lock access"; 163 } 164 if (reason != null) { 165 addToRunQueue(fairq, queue, reason); 166 } 167 } 168 169 @Override 170 protected boolean queueHasRunnables() { 171 return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() || 172 serverRunQueue.hasRunnables(); 173 } 174 175 @Override 176 protected Procedure dequeue(boolean onlyUrgent) { 177 // meta procedure is always the first priority 178 Procedure<?> pollResult = doPoll(metaRunQueue); 179 if (onlyUrgent) { 180 return pollResult; 181 } 182 // For now, let server handling have precedence over table handling; presumption is that it 183 // is more important handling crashed servers than it is running the 184 // enabling/disabling tables, etc. 185 if (pollResult == null) { 186 pollResult = doPoll(serverRunQueue); 187 } 188 if (pollResult == null) { 189 pollResult = doPoll(tableRunQueue); 190 } 191 return pollResult; 192 } 193 194 private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) { 195 LockStatus s = rq.getLockStatus(); 196 // if we have the lock access, we are ready 197 if (s.hasLockAccess(proc)) { 198 return true; 199 } 200 boolean xlockReq = rq.requireExclusiveLock(proc); 201 // if we need to hold the xlock, then we need to make sure that no one holds any lock, including 202 // the shared lock, otherwise, we just need to make sure that no one holds the xlock 203 return xlockReq ? !s.isLocked() : !s.hasExclusiveLock(); 204 } 205 206 private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) { 207 Queue<T> rq = fairq.poll(); 208 if (rq == null || !rq.isAvailable()) { 209 return null; 210 } 211 // loop until we find out a procedure which is ready to run, or if we have checked all the 212 // procedures, then we give up and remove the queue from run queue. 213 for (int i = 0, n = rq.size(); i < n; i++) { 214 Procedure<?> proc = rq.poll(); 215 if (isLockReady(proc, rq)) { 216 // the queue is empty, remove from run queue 217 if (rq.isEmpty()) { 218 removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc); 219 } 220 return proc; 221 } 222 // we are not ready to run, add back and try the next procedure 223 rq.add(proc, false); 224 } 225 // no procedure is ready for execution, remove from run queue 226 removeFromRunQueue(fairq, rq, () -> "no procedure can be executed"); 227 return null; 228 } 229 230 @Override 231 public List<LockedResource> getLocks() { 232 schedLock(); 233 try { 234 return locking.getLocks(); 235 } finally { 236 schedUnlock(); 237 } 238 } 239 240 @Override 241 public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) { 242 schedLock(); 243 try { 244 return locking.getLockResource(resourceType, resourceName); 245 } finally { 246 schedUnlock(); 247 } 248 } 249 250 @Override 251 public void clear() { 252 schedLock(); 253 try { 254 clearQueue(); 255 locking.clear(); 256 } finally { 257 schedUnlock(); 258 } 259 } 260 261 private void clearQueue() { 262 // Remove Servers 263 for (int i = 0; i < serverBuckets.length; ++i) { 264 clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR); 265 serverBuckets[i] = null; 266 } 267 268 // Remove Tables 269 clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); 270 tableMap = null; 271 272 assert size() == 0 : "expected queue size to be 0, got " + size(); 273 } 274 275 private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap, 276 FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) { 277 while (treeMap != null) { 278 Queue<T> node = AvlTree.getFirst(treeMap); 279 treeMap = AvlTree.remove(treeMap, node.getKey(), comparator); 280 if (fairq != null) { 281 removeFromRunQueue(fairq, node, () -> "clear all queues"); 282 } 283 } 284 } 285 286 private int queueSize(Queue<?> head) { 287 int count = 0; 288 AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head); 289 while (iter.hasNext()) { 290 count += iter.next().size(); 291 } 292 return count; 293 } 294 295 @Override 296 protected int queueSize() { 297 int count = 0; 298 for (ServerQueue serverMap : serverBuckets) { 299 count += queueSize(serverMap); 300 } 301 count += queueSize(tableMap); 302 count += queueSize(metaMap); 303 return count; 304 } 305 306 @Override 307 public void completionCleanup(final Procedure proc) { 308 if (proc instanceof TableProcedureInterface) { 309 TableProcedureInterface iProcTable = (TableProcedureInterface)proc; 310 boolean tableDeleted; 311 if (proc.hasException()) { 312 Exception procEx = proc.getException().unwrapRemoteException(); 313 if (iProcTable.getTableOperationType() == TableOperationType.CREATE) { 314 // create failed because the table already exist 315 tableDeleted = !(procEx instanceof TableExistsException); 316 } else { 317 // the operation failed because the table does not exist 318 tableDeleted = (procEx instanceof TableNotFoundException); 319 } 320 } else { 321 // the table was deleted 322 tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE); 323 } 324 if (tableDeleted) { 325 markTableAsDeleted(iProcTable.getTableName(), proc); 326 return; 327 } 328 } else if (proc instanceof ServerProcedureInterface) { 329 tryCleanupServerQueue(getServerName(proc), proc); 330 } else { 331 // No cleanup for other procedure types, yet. 332 return; 333 } 334 } 335 336 private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue, 337 Supplier<String> reason) { 338 if (LOG.isDebugEnabled()) { 339 LOG.debug("Add {} to run queue because: {}", queue, reason.get()); 340 } 341 if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) { 342 fairq.add(queue); 343 } 344 } 345 346 private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, 347 Queue<T> queue, Supplier<String> reason) { 348 if (LOG.isDebugEnabled()) { 349 LOG.debug("Remove {} from run queue because: {}", queue, reason.get()); 350 } 351 if (AvlIterableList.isLinked(queue)) { 352 fairq.remove(queue); 353 } 354 } 355 356 // ============================================================================ 357 // Table Queue Lookup Helpers 358 // ============================================================================ 359 private TableQueue getTableQueue(TableName tableName) { 360 TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); 361 if (node != null) return node; 362 363 node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName), 364 locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString())); 365 tableMap = AvlTree.insert(tableMap, node); 366 return node; 367 } 368 369 private void removeTableQueue(TableName tableName) { 370 tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); 371 locking.removeTableLock(tableName); 372 } 373 374 375 private static boolean isTableProcedure(Procedure proc) { 376 return proc instanceof TableProcedureInterface; 377 } 378 379 private static TableName getTableName(Procedure proc) { 380 return ((TableProcedureInterface)proc).getTableName(); 381 } 382 383 // ============================================================================ 384 // Server Queue Lookup Helpers 385 // ============================================================================ 386 private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) { 387 final int index = getBucketIndex(serverBuckets, serverName.hashCode()); 388 ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 389 if (node != null) { 390 return node; 391 } 392 int priority; 393 if (proc != null) { 394 priority = MasterProcedureUtil.getServerPriority(proc); 395 } else { 396 priority = 1; 397 } 398 node = new ServerQueue(serverName, priority, locking.getServerLock(serverName)); 399 serverBuckets[index] = AvlTree.insert(serverBuckets[index], node); 400 return node; 401 } 402 403 private void removeServerQueue(ServerName serverName) { 404 int index = getBucketIndex(serverBuckets, serverName.hashCode()); 405 serverBuckets[index] = 406 AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 407 locking.removeServerLock(serverName); 408 } 409 410 private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) { 411 schedLock(); 412 try { 413 int index = getBucketIndex(serverBuckets, serverName.hashCode()); 414 ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 415 if (node == null) { 416 return; 417 } 418 419 LockAndQueue lock = locking.getServerLock(serverName); 420 if (node.isEmpty() && lock.tryExclusiveLock(proc)) { 421 removeFromRunQueue(serverRunQueue, node, 422 () -> "clean up server queue after " + proc + " completed"); 423 removeServerQueue(serverName); 424 } 425 } finally { 426 schedUnlock(); 427 } 428 } 429 430 private static int getBucketIndex(Object[] buckets, int hashCode) { 431 return Math.abs(hashCode) % buckets.length; 432 } 433 434 private static boolean isServerProcedure(Procedure proc) { 435 return proc instanceof ServerProcedureInterface; 436 } 437 438 private static ServerName getServerName(Procedure proc) { 439 return ((ServerProcedureInterface)proc).getServerName(); 440 } 441 442 // ============================================================================ 443 // Meta Queue Lookup Helpers 444 // ============================================================================ 445 private MetaQueue getMetaQueue() { 446 MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR); 447 if (node != null) { 448 return node; 449 } 450 node = new MetaQueue(locking.getMetaLock()); 451 metaMap = AvlTree.insert(metaMap, node); 452 return node; 453 } 454 455 private static boolean isMetaProcedure(Procedure<?> proc) { 456 return proc instanceof MetaProcedureInterface; 457 } 458 459 // ============================================================================ 460 // Table Locking Helpers 461 // ============================================================================ 462 /** 463 * Get lock info for a resource of specified type and name and log details 464 */ 465 private void logLockedResource(LockedResourceType resourceType, String resourceName) { 466 if (!LOG.isDebugEnabled()) { 467 return; 468 } 469 470 LockedResource lockedResource = getLockResource(resourceType, resourceName); 471 if (lockedResource != null) { 472 String msg = resourceType.toString() + " '" + resourceName + "', shared lock count=" + 473 lockedResource.getSharedLockCount(); 474 475 Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure(); 476 if (proc != null) { 477 msg += ", exclusively locked by procId=" + proc.getProcId(); 478 } 479 LOG.debug(msg); 480 } 481 } 482 483 /** 484 * Suspend the procedure if the specified table is already locked. 485 * Other operations in the table-queue will be executed after the lock is released. 486 * @param procedure the procedure trying to acquire the lock 487 * @param table Table to lock 488 * @return true if the procedure has to wait for the table to be available 489 */ 490 public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) { 491 schedLock(); 492 try { 493 final String namespace = table.getNamespaceAsString(); 494 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 495 final LockAndQueue tableLock = locking.getTableLock(table); 496 if (!namespaceLock.trySharedLock(procedure)) { 497 waitProcedure(namespaceLock, procedure); 498 logLockedResource(LockedResourceType.NAMESPACE, namespace); 499 return true; 500 } 501 if (!tableLock.tryExclusiveLock(procedure)) { 502 namespaceLock.releaseSharedLock(); 503 waitProcedure(tableLock, procedure); 504 logLockedResource(LockedResourceType.TABLE, table.getNameAsString()); 505 return true; 506 } 507 removeFromRunQueue(tableRunQueue, getTableQueue(table), 508 () -> procedure + " held the exclusive lock"); 509 return false; 510 } finally { 511 schedUnlock(); 512 } 513 } 514 515 /** 516 * Wake the procedures waiting for the specified table 517 * @param procedure the procedure releasing the lock 518 * @param table the name of the table that has the exclusive lock 519 */ 520 public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) { 521 schedLock(); 522 try { 523 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 524 final LockAndQueue tableLock = locking.getTableLock(table); 525 int waitingCount = 0; 526 if (tableLock.releaseExclusiveLock(procedure)) { 527 waitingCount += wakeWaitingProcedures(tableLock); 528 } 529 if (namespaceLock.releaseSharedLock()) { 530 waitingCount += wakeWaitingProcedures(namespaceLock); 531 } 532 addToRunQueue(tableRunQueue, getTableQueue(table), 533 () -> procedure + " released the exclusive lock"); 534 wakePollIfNeeded(waitingCount); 535 } finally { 536 schedUnlock(); 537 } 538 } 539 540 /** 541 * Suspend the procedure if the specified table is already locked. 542 * other "read" operations in the table-queue may be executed concurrently, 543 * @param procedure the procedure trying to acquire the lock 544 * @param table Table to lock 545 * @return true if the procedure has to wait for the table to be available 546 */ 547 public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) { 548 return waitTableQueueSharedLock(procedure, table) == null; 549 } 550 551 private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) { 552 schedLock(); 553 try { 554 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 555 final LockAndQueue tableLock = locking.getTableLock(table); 556 if (!namespaceLock.trySharedLock(procedure)) { 557 waitProcedure(namespaceLock, procedure); 558 return null; 559 } 560 561 if (!tableLock.trySharedLock(procedure)) { 562 namespaceLock.releaseSharedLock(); 563 waitProcedure(tableLock, procedure); 564 return null; 565 } 566 567 return getTableQueue(table); 568 } finally { 569 schedUnlock(); 570 } 571 } 572 573 /** 574 * Wake the procedures waiting for the specified table 575 * @param procedure the procedure releasing the lock 576 * @param table the name of the table that has the shared lock 577 */ 578 public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) { 579 schedLock(); 580 try { 581 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 582 final LockAndQueue tableLock = locking.getTableLock(table); 583 int waitingCount = 0; 584 if (tableLock.releaseSharedLock()) { 585 addToRunQueue(tableRunQueue, getTableQueue(table), 586 () -> procedure + " released the shared lock"); 587 waitingCount += wakeWaitingProcedures(tableLock); 588 } 589 if (namespaceLock.releaseSharedLock()) { 590 waitingCount += wakeWaitingProcedures(namespaceLock); 591 } 592 wakePollIfNeeded(waitingCount); 593 } finally { 594 schedUnlock(); 595 } 596 } 597 598 /** 599 * Tries to remove the queue and the table-lock of the specified table. 600 * If there are new operations pending (e.g. a new create), 601 * the remove will not be performed. 602 * @param table the name of the table that should be marked as deleted 603 * @param procedure the procedure that is removing the table 604 * @return true if deletion succeeded, false otherwise meaning that there are 605 * other new operations pending for that table (e.g. a new create). 606 */ 607 @VisibleForTesting 608 boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) { 609 schedLock(); 610 try { 611 final TableQueue queue = getTableQueue(table); 612 final LockAndQueue tableLock = locking.getTableLock(table); 613 if (queue == null) return true; 614 615 if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) { 616 // remove the table from the run-queue and the map 617 if (AvlIterableList.isLinked(queue)) { 618 tableRunQueue.remove(queue); 619 } 620 removeTableQueue(table); 621 } else { 622 // TODO: If there are no create, we can drop all the other ops 623 return false; 624 } 625 } finally { 626 schedUnlock(); 627 } 628 return true; 629 } 630 631 // ============================================================================ 632 // Region Locking Helpers 633 // ============================================================================ 634 /** 635 * Suspend the procedure if the specified region is already locked. 636 * @param procedure the procedure trying to acquire the lock on the region 637 * @param regionInfo the region we are trying to lock 638 * @return true if the procedure has to wait for the regions to be available 639 */ 640 public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) { 641 return waitRegions(procedure, regionInfo.getTable(), regionInfo); 642 } 643 644 /** 645 * Suspend the procedure if the specified set of regions are already locked. 646 * @param procedure the procedure trying to acquire the lock on the regions 647 * @param table the table name of the regions we are trying to lock 648 * @param regionInfo the list of regions we are trying to lock 649 * @return true if the procedure has to wait for the regions to be available 650 */ 651 public boolean waitRegions(final Procedure<?> procedure, final TableName table, 652 final RegionInfo... regionInfo) { 653 Arrays.sort(regionInfo, RegionInfo.COMPARATOR); 654 schedLock(); 655 try { 656 assert table != null; 657 if (waitTableSharedLock(procedure, table)) { 658 return true; 659 } 660 661 // acquire region xlocks or wait 662 boolean hasLock = true; 663 final LockAndQueue[] regionLocks = new LockAndQueue[regionInfo.length]; 664 for (int i = 0; i < regionInfo.length; ++i) { 665 assert regionInfo[i] != null; 666 assert regionInfo[i].getTable() != null; 667 assert regionInfo[i].getTable().equals(table): regionInfo[i] + " " + procedure; 668 assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i]; 669 670 regionLocks[i] = locking.getRegionLock(regionInfo[i].getEncodedName()); 671 if (!regionLocks[i].tryExclusiveLock(procedure)) { 672 LOG.info("Waiting on xlock for {} held by pid={}", procedure, 673 regionLocks[i].getExclusiveLockProcIdOwner()); 674 waitProcedure(regionLocks[i], procedure); 675 hasLock = false; 676 while (i-- > 0) { 677 regionLocks[i].releaseExclusiveLock(procedure); 678 } 679 break; 680 } else { 681 LOG.info("Took xlock for {}", procedure); 682 } 683 } 684 685 if (!hasLock) { 686 wakeTableSharedLock(procedure, table); 687 } 688 return !hasLock; 689 } finally { 690 schedUnlock(); 691 } 692 } 693 694 /** 695 * Wake the procedures waiting for the specified region 696 * @param procedure the procedure that was holding the region 697 * @param regionInfo the region the procedure was holding 698 */ 699 public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) { 700 wakeRegions(procedure, regionInfo.getTable(), regionInfo); 701 } 702 703 /** 704 * Wake the procedures waiting for the specified regions 705 * @param procedure the procedure that was holding the regions 706 * @param regionInfo the list of regions the procedure was holding 707 */ 708 public void wakeRegions(final Procedure<?> procedure,final TableName table, 709 final RegionInfo... regionInfo) { 710 Arrays.sort(regionInfo, RegionInfo.COMPARATOR); 711 schedLock(); 712 try { 713 int numProcs = 0; 714 final Procedure<?>[] nextProcs = new Procedure[regionInfo.length]; 715 for (int i = 0; i < regionInfo.length; ++i) { 716 assert regionInfo[i].getTable().equals(table); 717 assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i]; 718 719 LockAndQueue regionLock = locking.getRegionLock(regionInfo[i].getEncodedName()); 720 if (regionLock.releaseExclusiveLock(procedure)) { 721 if (!regionLock.isWaitingQueueEmpty()) { 722 // release one procedure at the time since regions has an xlock 723 nextProcs[numProcs++] = regionLock.removeFirst(); 724 } else { 725 locking.removeRegionLock(regionInfo[i].getEncodedName()); 726 } 727 } 728 } 729 730 // awake procedures if any 731 for (int i = numProcs - 1; i >= 0; --i) { 732 wakeProcedure(nextProcs[i]); 733 } 734 wakePollIfNeeded(numProcs); 735 // release the table shared-lock. 736 wakeTableSharedLock(procedure, table); 737 } finally { 738 schedUnlock(); 739 } 740 } 741 742 // ============================================================================ 743 // Namespace Locking Helpers 744 // ============================================================================ 745 /** 746 * Suspend the procedure if the specified namespace is already locked. 747 * @see #wakeNamespaceExclusiveLock(Procedure,String) 748 * @param procedure the procedure trying to acquire the lock 749 * @param namespace Namespace to lock 750 * @return true if the procedure has to wait for the namespace to be available 751 */ 752 public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) { 753 schedLock(); 754 try { 755 final LockAndQueue systemNamespaceTableLock = 756 locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); 757 if (!systemNamespaceTableLock.trySharedLock(procedure)) { 758 waitProcedure(systemNamespaceTableLock, procedure); 759 logLockedResource(LockedResourceType.TABLE, 760 TableName.NAMESPACE_TABLE_NAME.getNameAsString()); 761 return true; 762 } 763 764 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 765 if (!namespaceLock.tryExclusiveLock(procedure)) { 766 systemNamespaceTableLock.releaseSharedLock(); 767 waitProcedure(namespaceLock, procedure); 768 logLockedResource(LockedResourceType.NAMESPACE, namespace); 769 return true; 770 } 771 return false; 772 } finally { 773 schedUnlock(); 774 } 775 } 776 777 /** 778 * Wake the procedures waiting for the specified namespace 779 * @see #waitNamespaceExclusiveLock(Procedure,String) 780 * @param procedure the procedure releasing the lock 781 * @param namespace the namespace that has the exclusive lock 782 */ 783 public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) { 784 schedLock(); 785 try { 786 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 787 final LockAndQueue systemNamespaceTableLock = 788 locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); 789 int waitingCount = 0; 790 if (namespaceLock.releaseExclusiveLock(procedure)) { 791 waitingCount += wakeWaitingProcedures(namespaceLock); 792 } 793 if (systemNamespaceTableLock.releaseSharedLock()) { 794 addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME), 795 () -> procedure + " released namespace exclusive lock"); 796 waitingCount += wakeWaitingProcedures(systemNamespaceTableLock); 797 } 798 wakePollIfNeeded(waitingCount); 799 } finally { 800 schedUnlock(); 801 } 802 } 803 804 // ============================================================================ 805 // Server Locking Helpers 806 // ============================================================================ 807 /** 808 * Try to acquire the exclusive lock on the specified server. 809 * @see #wakeServerExclusiveLock(Procedure,ServerName) 810 * @param procedure the procedure trying to acquire the lock 811 * @param serverName Server to lock 812 * @return true if the procedure has to wait for the server to be available 813 */ 814 public boolean waitServerExclusiveLock(final Procedure procedure, final ServerName serverName) { 815 schedLock(); 816 try { 817 final LockAndQueue lock = locking.getServerLock(serverName); 818 if (lock.tryExclusiveLock(procedure)) { 819 // In tests we may pass procedures other than ServerProcedureInterface, just pass null if 820 // so. 821 removeFromRunQueue(serverRunQueue, 822 getServerQueue(serverName, 823 procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure 824 : null), 825 () -> procedure + " held exclusive lock"); 826 return false; 827 } 828 waitProcedure(lock, procedure); 829 logLockedResource(LockedResourceType.SERVER, serverName.getServerName()); 830 return true; 831 } finally { 832 schedUnlock(); 833 } 834 } 835 836 /** 837 * Wake the procedures waiting for the specified server 838 * @see #waitServerExclusiveLock(Procedure,ServerName) 839 * @param procedure the procedure releasing the lock 840 * @param serverName the server that has the exclusive lock 841 */ 842 public void wakeServerExclusiveLock(final Procedure procedure, final ServerName serverName) { 843 schedLock(); 844 try { 845 final LockAndQueue lock = locking.getServerLock(serverName); 846 // Only SCP will acquire/release server lock so do not need to check the return value here. 847 lock.releaseExclusiveLock(procedure); 848 // In tests we may pass procedures other than ServerProcedureInterface, just pass null if 849 // so. 850 addToRunQueue(serverRunQueue, 851 getServerQueue(serverName, 852 procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure 853 : null), () -> procedure + " released exclusive lock"); 854 int waitingCount = wakeWaitingProcedures(lock); 855 wakePollIfNeeded(waitingCount); 856 } finally { 857 schedUnlock(); 858 } 859 } 860 861 // ============================================================================ 862 // Meta Locking Helpers 863 // ============================================================================ 864 /** 865 * Try to acquire the exclusive lock on meta. 866 * @see #wakeMetaExclusiveLock(Procedure) 867 * @param procedure the procedure trying to acquire the lock 868 * @return true if the procedure has to wait for meta to be available 869 * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with 870 * {@link RecoverMetaProcedure}. 871 */ 872 @Deprecated 873 public boolean waitMetaExclusiveLock(Procedure<?> procedure) { 874 schedLock(); 875 try { 876 final LockAndQueue lock = locking.getMetaLock(); 877 if (lock.tryExclusiveLock(procedure)) { 878 removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock"); 879 return false; 880 } 881 waitProcedure(lock, procedure); 882 logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString()); 883 return true; 884 } finally { 885 schedUnlock(); 886 } 887 } 888 889 /** 890 * Wake the procedures waiting for meta. 891 * @see #waitMetaExclusiveLock(Procedure) 892 * @param procedure the procedure releasing the lock 893 * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with 894 * {@link RecoverMetaProcedure}. 895 */ 896 @Deprecated 897 public void wakeMetaExclusiveLock(Procedure<?> procedure) { 898 schedLock(); 899 try { 900 final LockAndQueue lock = locking.getMetaLock(); 901 lock.releaseExclusiveLock(procedure); 902 addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock"); 903 int waitingCount = wakeWaitingProcedures(lock); 904 wakePollIfNeeded(waitingCount); 905 } finally { 906 schedUnlock(); 907 } 908 } 909 910 /** 911 * For debugging. Expensive. 912 */ 913 @VisibleForTesting 914 public String dumpLocks() throws IOException { 915 schedLock(); 916 try { 917 // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter 918 return this.locking.toString(); 919 } finally { 920 schedUnlock(); 921 } 922 } 923}