001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import java.util.function.Function; 024import java.util.function.Supplier; 025import org.apache.hadoop.hbase.ServerName; 026import org.apache.hadoop.hbase.TableExistsException; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.TableNotFoundException; 029import org.apache.hadoop.hbase.client.RegionInfo; 030import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType; 031import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; 032import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; 033import org.apache.hadoop.hbase.procedure2.LockAndQueue; 034import org.apache.hadoop.hbase.procedure2.LockStatus; 035import org.apache.hadoop.hbase.procedure2.LockedResource; 036import org.apache.hadoop.hbase.procedure2.LockedResourceType; 037import org.apache.hadoop.hbase.procedure2.Procedure; 038import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; 039import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator; 040import org.apache.hadoop.hbase.util.AvlUtil.AvlTree; 041import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 047 048/** 049 * ProcedureScheduler for the Master Procedures. 050 * This ProcedureScheduler tries to provide to the ProcedureExecutor procedures 051 * that can be executed without having to wait on a lock. 052 * Most of the master operations can be executed concurrently, if they 053 * are operating on different tables (e.g. two create table procedures can be performed 054 * at the same time) or against two different servers; say two servers that crashed at 055 * about the same time. 056 * 057 * <p>Each procedure should implement an Interface providing information for this queue. 058 * For example table related procedures should implement TableProcedureInterface. 059 * Each procedure will be pushed in its own queue, and based on the operation type 060 * we may make smarter decisions: e.g. we can abort all the operations preceding 061 * a delete table, or similar. 062 * 063 * <h4>Concurrency control</h4> 064 * Concurrent access to member variables (tableRunQueue, serverRunQueue, locking, tableMap, 065 * serverBuckets) is controlled by schedLock(). This mainly includes:<br> 066 * <ul> 067 * <li> 068 * {@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue 069 * when: 070 * <ol> 071 * <li>Queue was empty before push (so must have been out of run-queue)</li> 072 * <li>Child procedure is added (which means parent procedure holds exclusive lock, and it 073 * must have moved Queue out of run-queue)</li> 074 * </ol> 075 * </li> 076 * <li> 077 * {@link #poll(long)}: A poll will remove a Queue from run-queue when: 078 * <ol> 079 * <li>Queue becomes empty after poll</li> 080 * <li>Exclusive lock is requested by polled procedure and lock is available (returns the 081 * procedure)</li> 082 * <li>Exclusive lock is requested but lock is not available (returns null)</li> 083 * <li>Polled procedure is child of parent holding exclusive lock and the next procedure is 084 * not a child</li> 085 * </ol> 086 * </li> 087 * <li> 088 * Namespace/table/region locks: Queue is added back to run-queue when lock being released is: 089 * <ol> 090 * <li>Exclusive lock</li> 091 * <li>Last shared lock (in case queue was removed because next procedure in queue required 092 * exclusive lock)</li> 093 * </ol> 094 * </li> 095 * </ul> 096 */ 097@InterfaceAudience.Private 098public class MasterProcedureScheduler extends AbstractProcedureScheduler { 099 private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class); 100 101 private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR = 102 (n, k) -> n.compareKey((ServerName) k); 103 private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR = 104 (n, k) -> n.compareKey((TableName) k); 105 private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR = 106 (n, k) -> n.compareKey((String) k); 107 private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR = 108 (n, k) -> n.compareKey((TableName) k); 109 110 private final FairQueue<ServerName> serverRunQueue = new FairQueue<>(); 111 private final FairQueue<TableName> tableRunQueue = new FairQueue<>(); 112 private final FairQueue<String> peerRunQueue = new FairQueue<>(); 113 private final FairQueue<TableName> metaRunQueue = new FairQueue<>(); 114 115 private final ServerQueue[] serverBuckets = new ServerQueue[128]; 116 private TableQueue tableMap = null; 117 private PeerQueue peerMap = null; 118 private MetaQueue metaMap = null; 119 120 private final SchemaLocking locking; 121 122 public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) { 123 locking = new SchemaLocking(procedureRetriever); 124 } 125 126 @Override 127 public void yield(final Procedure proc) { 128 push(proc, false, true); 129 } 130 131 @Override 132 protected void enqueue(final Procedure proc, final boolean addFront) { 133 if (isMetaProcedure(proc) || 134 (isTableProcedure(proc) && getTableName(proc).equals(TableName.META_TABLE_NAME))) { 135 doAdd(metaRunQueue, getMetaQueue(), proc, addFront); 136 } else if (isTableProcedure(proc)) { 137 doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); 138 } else if (isServerProcedure(proc)) { 139 ServerProcedureInterface spi = (ServerProcedureInterface) proc; 140 doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront); 141 } else if (isPeerProcedure(proc)) { 142 doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront); 143 } else { 144 // TODO: at the moment we only have Table and Server procedures 145 // if you are implementing a non-table/non-server procedure, you have two options: create 146 // a group for all the non-table/non-server procedures or try to find a key for your 147 // non-table/non-server procedures and implement something similar to the TableRunQueue. 148 throw new UnsupportedOperationException( 149 "RQs for non-table/non-server procedures are not implemented yet: " + proc); 150 } 151 } 152 153 private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue, 154 Procedure<?> proc, boolean addFront) { 155 queue.add(proc, addFront); 156 // For the following conditions, we will put the queue back into execution 157 // 1. The procedure has already held the lock, or the lock has been restored when restarting, 158 // which means it can be executed immediately. 159 // 2. The exclusive lock for this queue has not been held. 160 // 3. The given procedure has the exclusive lock permission for this queue. 161 Supplier<String> reason = null; 162 if (proc.hasLock()) { 163 reason = () -> proc + " has lock"; 164 } else if (proc.isLockedWhenLoading()) { 165 reason = () -> proc + " restores lock when restarting"; 166 } else if (!queue.getLockStatus().hasExclusiveLock()) { 167 reason = () -> "the exclusive lock is not held by anyone when adding " + proc; 168 } else if (queue.getLockStatus().hasLockAccess(proc)) { 169 reason = () -> proc + " has the excusive lock access"; 170 } 171 if (reason != null) { 172 addToRunQueue(fairq, queue, reason); 173 } 174 } 175 176 @Override 177 protected boolean queueHasRunnables() { 178 return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() || 179 serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables(); 180 } 181 182 @Override 183 protected Procedure dequeue(boolean onlyUrgent) { 184 // meta procedure is always the first priority 185 Procedure<?> pollResult = doPoll(metaRunQueue); 186 if (onlyUrgent) { 187 return pollResult; 188 } 189 // For now, let server handling have precedence over table handling; presumption is that it 190 // is more important handling crashed servers than it is running the 191 // enabling/disabling tables, etc. 192 if (pollResult == null) { 193 pollResult = doPoll(serverRunQueue); 194 } 195 if (pollResult == null) { 196 pollResult = doPoll(peerRunQueue); 197 } 198 if (pollResult == null) { 199 pollResult = doPoll(tableRunQueue); 200 } 201 return pollResult; 202 } 203 204 private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) { 205 LockStatus s = rq.getLockStatus(); 206 // if we have the lock access, we are ready 207 if (s.hasLockAccess(proc)) { 208 return true; 209 } 210 boolean xlockReq = rq.requireExclusiveLock(proc); 211 // if we need to hold the xlock, then we need to make sure that no one holds any lock, including 212 // the shared lock, otherwise, we just need to make sure that no one holds the xlock 213 return xlockReq ? !s.isLocked() : !s.hasExclusiveLock(); 214 } 215 216 private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) { 217 Queue<T> rq = fairq.poll(); 218 if (rq == null || !rq.isAvailable()) { 219 return null; 220 } 221 // loop until we find out a procedure which is ready to run, or if we have checked all the 222 // procedures, then we give up and remove the queue from run queue. 223 for (int i = 0, n = rq.size(); i < n; i++) { 224 Procedure<?> proc = rq.poll(); 225 if (isLockReady(proc, rq)) { 226 // the queue is empty, remove from run queue 227 if (rq.isEmpty()) { 228 removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc); 229 } 230 return proc; 231 } 232 // we are not ready to run, add back and try the next procedure 233 rq.add(proc, false); 234 } 235 // no procedure is ready for execution, remove from run queue 236 removeFromRunQueue(fairq, rq, () -> "no procedure can be executed"); 237 return null; 238 } 239 240 @Override 241 public List<LockedResource> getLocks() { 242 schedLock(); 243 try { 244 return locking.getLocks(); 245 } finally { 246 schedUnlock(); 247 } 248 } 249 250 @Override 251 public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) { 252 schedLock(); 253 try { 254 return locking.getLockResource(resourceType, resourceName); 255 } finally { 256 schedUnlock(); 257 } 258 } 259 260 @Override 261 public void clear() { 262 schedLock(); 263 try { 264 clearQueue(); 265 locking.clear(); 266 } finally { 267 schedUnlock(); 268 } 269 } 270 271 private void clearQueue() { 272 // Remove Servers 273 for (int i = 0; i < serverBuckets.length; ++i) { 274 clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR); 275 serverBuckets[i] = null; 276 } 277 278 // Remove Tables 279 clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); 280 tableMap = null; 281 282 // Remove Peers 283 clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR); 284 peerMap = null; 285 286 assert size() == 0 : "expected queue size to be 0, got " + size(); 287 } 288 289 private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap, 290 FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) { 291 while (treeMap != null) { 292 Queue<T> node = AvlTree.getFirst(treeMap); 293 treeMap = AvlTree.remove(treeMap, node.getKey(), comparator); 294 if (fairq != null) { 295 removeFromRunQueue(fairq, node, () -> "clear all queues"); 296 } 297 } 298 } 299 300 private int queueSize(Queue<?> head) { 301 int count = 0; 302 AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head); 303 while (iter.hasNext()) { 304 count += iter.next().size(); 305 } 306 return count; 307 } 308 309 @Override 310 protected int queueSize() { 311 int count = 0; 312 for (ServerQueue serverMap : serverBuckets) { 313 count += queueSize(serverMap); 314 } 315 count += queueSize(tableMap); 316 count += queueSize(peerMap); 317 count += queueSize(metaMap); 318 return count; 319 } 320 321 @Override 322 public void completionCleanup(final Procedure proc) { 323 if (proc instanceof TableProcedureInterface) { 324 TableProcedureInterface iProcTable = (TableProcedureInterface) proc; 325 boolean tableDeleted; 326 if (proc.hasException()) { 327 Exception procEx = proc.getException().unwrapRemoteException(); 328 if (iProcTable.getTableOperationType() == TableOperationType.CREATE) { 329 // create failed because the table already exist 330 tableDeleted = !(procEx instanceof TableExistsException); 331 } else { 332 // the operation failed because the table does not exist 333 tableDeleted = (procEx instanceof TableNotFoundException); 334 } 335 } else { 336 // the table was deleted 337 tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE); 338 } 339 if (tableDeleted) { 340 markTableAsDeleted(iProcTable.getTableName(), proc); 341 return; 342 } 343 } else if (proc instanceof PeerProcedureInterface) { 344 tryCleanupPeerQueue(getPeerId(proc), proc); 345 } else if (proc instanceof ServerProcedureInterface) { 346 tryCleanupServerQueue(getServerName(proc), proc); 347 } else { 348 // No cleanup for other procedure types, yet. 349 return; 350 } 351 } 352 353 private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue, 354 Supplier<String> reason) { 355 if (LOG.isDebugEnabled()) { 356 LOG.debug("Add {} to run queue because: {}", queue, reason.get()); 357 } 358 if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) { 359 fairq.add(queue); 360 } 361 } 362 363 private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, 364 Queue<T> queue, Supplier<String> reason) { 365 if (LOG.isDebugEnabled()) { 366 LOG.debug("Remove {} from run queue because: {}", queue, reason.get()); 367 } 368 if (AvlIterableList.isLinked(queue)) { 369 fairq.remove(queue); 370 } 371 } 372 373 // ============================================================================ 374 // Table Queue Lookup Helpers 375 // ============================================================================ 376 private TableQueue getTableQueue(TableName tableName) { 377 TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); 378 if (node != null) return node; 379 380 node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName), 381 locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString())); 382 tableMap = AvlTree.insert(tableMap, node); 383 return node; 384 } 385 386 private void removeTableQueue(TableName tableName) { 387 tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); 388 locking.removeTableLock(tableName); 389 } 390 391 private static boolean isTableProcedure(Procedure<?> proc) { 392 return proc instanceof TableProcedureInterface; 393 } 394 395 private static TableName getTableName(Procedure<?> proc) { 396 return ((TableProcedureInterface)proc).getTableName(); 397 } 398 399 // ============================================================================ 400 // Server Queue Lookup Helpers 401 // ============================================================================ 402 private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) { 403 final int index = getBucketIndex(serverBuckets, serverName.hashCode()); 404 ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 405 if (node != null) { 406 return node; 407 } 408 int priority; 409 if (proc != null) { 410 priority = MasterProcedureUtil.getServerPriority(proc); 411 } else { 412 priority = 1; 413 } 414 node = new ServerQueue(serverName, priority, locking.getServerLock(serverName)); 415 serverBuckets[index] = AvlTree.insert(serverBuckets[index], node); 416 return node; 417 } 418 419 private void removeServerQueue(ServerName serverName) { 420 int index = getBucketIndex(serverBuckets, serverName.hashCode()); 421 serverBuckets[index] = 422 AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 423 locking.removeServerLock(serverName); 424 } 425 426 private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) { 427 schedLock(); 428 try { 429 int index = getBucketIndex(serverBuckets, serverName.hashCode()); 430 ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 431 if (node == null) { 432 return; 433 } 434 435 LockAndQueue lock = locking.getServerLock(serverName); 436 if (node.isEmpty() && lock.tryExclusiveLock(proc)) { 437 removeFromRunQueue(serverRunQueue, node, 438 () -> "clean up server queue after " + proc + " completed"); 439 removeServerQueue(serverName); 440 } 441 } finally { 442 schedUnlock(); 443 } 444 } 445 446 private static int getBucketIndex(Object[] buckets, int hashCode) { 447 return Math.abs(hashCode) % buckets.length; 448 } 449 450 private static boolean isServerProcedure(Procedure<?> proc) { 451 return proc instanceof ServerProcedureInterface; 452 } 453 454 private static ServerName getServerName(Procedure<?> proc) { 455 return ((ServerProcedureInterface)proc).getServerName(); 456 } 457 458 // ============================================================================ 459 // Peer Queue Lookup Helpers 460 // ============================================================================ 461 private PeerQueue getPeerQueue(String peerId) { 462 PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 463 if (node != null) { 464 return node; 465 } 466 node = new PeerQueue(peerId, locking.getPeerLock(peerId)); 467 peerMap = AvlTree.insert(peerMap, node); 468 return node; 469 } 470 471 private void removePeerQueue(String peerId) { 472 peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 473 locking.removePeerLock(peerId); 474 } 475 476 private void tryCleanupPeerQueue(String peerId, Procedure procedure) { 477 schedLock(); 478 try { 479 PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 480 if (queue == null) { 481 return; 482 } 483 484 final LockAndQueue lock = locking.getPeerLock(peerId); 485 if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) { 486 removeFromRunQueue(peerRunQueue, queue, 487 () -> "clean up peer queue after " + procedure + " completed"); 488 removePeerQueue(peerId); 489 } 490 } finally { 491 schedUnlock(); 492 } 493 } 494 495 private static boolean isPeerProcedure(Procedure<?> proc) { 496 return proc instanceof PeerProcedureInterface; 497 } 498 499 private static String getPeerId(Procedure<?> proc) { 500 return ((PeerProcedureInterface) proc).getPeerId(); 501 } 502 503 // ============================================================================ 504 // Meta Queue Lookup Helpers 505 // ============================================================================ 506 private MetaQueue getMetaQueue() { 507 MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR); 508 if (node != null) { 509 return node; 510 } 511 node = new MetaQueue(locking.getMetaLock()); 512 metaMap = AvlTree.insert(metaMap, node); 513 return node; 514 } 515 516 private static boolean isMetaProcedure(Procedure<?> proc) { 517 return proc instanceof MetaProcedureInterface; 518 } 519 // ============================================================================ 520 // Table Locking Helpers 521 // ============================================================================ 522 /** 523 * Get lock info for a resource of specified type and name and log details 524 */ 525 private void logLockedResource(LockedResourceType resourceType, String resourceName) { 526 if (!LOG.isDebugEnabled()) { 527 return; 528 } 529 530 LockedResource lockedResource = getLockResource(resourceType, resourceName); 531 if (lockedResource != null) { 532 String msg = resourceType.toString() + " '" + resourceName + "', shared lock count=" + 533 lockedResource.getSharedLockCount(); 534 535 Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure(); 536 if (proc != null) { 537 msg += ", exclusively locked by procId=" + proc.getProcId(); 538 } 539 LOG.debug(msg); 540 } 541 } 542 543 /** 544 * Suspend the procedure if the specified table is already locked. 545 * Other operations in the table-queue will be executed after the lock is released. 546 * @param procedure the procedure trying to acquire the lock 547 * @param table Table to lock 548 * @return true if the procedure has to wait for the table to be available 549 */ 550 public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) { 551 schedLock(); 552 try { 553 final String namespace = table.getNamespaceAsString(); 554 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 555 final LockAndQueue tableLock = locking.getTableLock(table); 556 if (!namespaceLock.trySharedLock(procedure)) { 557 waitProcedure(namespaceLock, procedure); 558 logLockedResource(LockedResourceType.NAMESPACE, namespace); 559 return true; 560 } 561 if (!tableLock.tryExclusiveLock(procedure)) { 562 namespaceLock.releaseSharedLock(); 563 waitProcedure(tableLock, procedure); 564 logLockedResource(LockedResourceType.TABLE, table.getNameAsString()); 565 return true; 566 } 567 removeFromRunQueue(tableRunQueue, getTableQueue(table), 568 () -> procedure + " held the exclusive lock"); 569 return false; 570 } finally { 571 schedUnlock(); 572 } 573 } 574 575 /** 576 * Wake the procedures waiting for the specified table 577 * @param procedure the procedure releasing the lock 578 * @param table the name of the table that has the exclusive lock 579 */ 580 public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) { 581 schedLock(); 582 try { 583 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 584 final LockAndQueue tableLock = locking.getTableLock(table); 585 int waitingCount = 0; 586 if (tableLock.releaseExclusiveLock(procedure)) { 587 waitingCount += wakeWaitingProcedures(tableLock); 588 } 589 if (namespaceLock.releaseSharedLock()) { 590 waitingCount += wakeWaitingProcedures(namespaceLock); 591 } 592 addToRunQueue(tableRunQueue, getTableQueue(table), 593 () -> procedure + " released the exclusive lock"); 594 wakePollIfNeeded(waitingCount); 595 } finally { 596 schedUnlock(); 597 } 598 } 599 600 /** 601 * Suspend the procedure if the specified table is already locked. 602 * other "read" operations in the table-queue may be executed concurrently, 603 * @param procedure the procedure trying to acquire the lock 604 * @param table Table to lock 605 * @return true if the procedure has to wait for the table to be available 606 */ 607 public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) { 608 return waitTableQueueSharedLock(procedure, table) == null; 609 } 610 611 private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) { 612 schedLock(); 613 try { 614 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 615 final LockAndQueue tableLock = locking.getTableLock(table); 616 if (!namespaceLock.trySharedLock(procedure)) { 617 waitProcedure(namespaceLock, procedure); 618 return null; 619 } 620 621 if (!tableLock.trySharedLock(procedure)) { 622 namespaceLock.releaseSharedLock(); 623 waitProcedure(tableLock, procedure); 624 return null; 625 } 626 627 return getTableQueue(table); 628 } finally { 629 schedUnlock(); 630 } 631 } 632 633 /** 634 * Wake the procedures waiting for the specified table 635 * @param procedure the procedure releasing the lock 636 * @param table the name of the table that has the shared lock 637 */ 638 public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) { 639 schedLock(); 640 try { 641 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 642 final LockAndQueue tableLock = locking.getTableLock(table); 643 int waitingCount = 0; 644 if (tableLock.releaseSharedLock()) { 645 addToRunQueue(tableRunQueue, getTableQueue(table), 646 () -> procedure + " released the shared lock"); 647 waitingCount += wakeWaitingProcedures(tableLock); 648 } 649 if (namespaceLock.releaseSharedLock()) { 650 waitingCount += wakeWaitingProcedures(namespaceLock); 651 } 652 wakePollIfNeeded(waitingCount); 653 } finally { 654 schedUnlock(); 655 } 656 } 657 658 /** 659 * Tries to remove the queue and the table-lock of the specified table. 660 * If there are new operations pending (e.g. a new create), 661 * the remove will not be performed. 662 * @param table the name of the table that should be marked as deleted 663 * @param procedure the procedure that is removing the table 664 * @return true if deletion succeeded, false otherwise meaning that there are 665 * other new operations pending for that table (e.g. a new create). 666 */ 667 @VisibleForTesting 668 boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) { 669 schedLock(); 670 try { 671 final TableQueue queue = getTableQueue(table); 672 final LockAndQueue tableLock = locking.getTableLock(table); 673 if (queue == null) return true; 674 675 if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) { 676 // remove the table from the run-queue and the map 677 if (AvlIterableList.isLinked(queue)) { 678 tableRunQueue.remove(queue); 679 } 680 removeTableQueue(table); 681 } else { 682 // TODO: If there are no create, we can drop all the other ops 683 return false; 684 } 685 } finally { 686 schedUnlock(); 687 } 688 return true; 689 } 690 691 // ============================================================================ 692 // Region Locking Helpers 693 // ============================================================================ 694 /** 695 * Suspend the procedure if the specified region is already locked. 696 * @param procedure the procedure trying to acquire the lock on the region 697 * @param regionInfo the region we are trying to lock 698 * @return true if the procedure has to wait for the regions to be available 699 */ 700 public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) { 701 return waitRegions(procedure, regionInfo.getTable(), regionInfo); 702 } 703 704 /** 705 * Suspend the procedure if the specified set of regions are already locked. 706 * @param procedure the procedure trying to acquire the lock on the regions 707 * @param table the table name of the regions we are trying to lock 708 * @param regionInfo the list of regions we are trying to lock 709 * @return true if the procedure has to wait for the regions to be available 710 */ 711 public boolean waitRegions(final Procedure<?> procedure, final TableName table, 712 final RegionInfo... regionInfo) { 713 Arrays.sort(regionInfo, RegionInfo.COMPARATOR); 714 schedLock(); 715 try { 716 assert table != null; 717 if (waitTableSharedLock(procedure, table)) { 718 return true; 719 } 720 721 // acquire region xlocks or wait 722 boolean hasLock = true; 723 final LockAndQueue[] regionLocks = new LockAndQueue[regionInfo.length]; 724 for (int i = 0; i < regionInfo.length; ++i) { 725 assert regionInfo[i] != null; 726 assert regionInfo[i].getTable() != null; 727 assert regionInfo[i].getTable().equals(table): regionInfo[i] + " " + procedure; 728 assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i]; 729 730 regionLocks[i] = locking.getRegionLock(regionInfo[i].getEncodedName()); 731 if (!regionLocks[i].tryExclusiveLock(procedure)) { 732 LOG.info("Waiting on xlock for {} held by pid={}", procedure, 733 regionLocks[i].getExclusiveLockProcIdOwner()); 734 waitProcedure(regionLocks[i], procedure); 735 hasLock = false; 736 while (i-- > 0) { 737 regionLocks[i].releaseExclusiveLock(procedure); 738 } 739 break; 740 } else { 741 LOG.info("Took xlock for {}", procedure); 742 } 743 } 744 745 if (!hasLock) { 746 wakeTableSharedLock(procedure, table); 747 } 748 return !hasLock; 749 } finally { 750 schedUnlock(); 751 } 752 } 753 754 /** 755 * Wake the procedures waiting for the specified region 756 * @param procedure the procedure that was holding the region 757 * @param regionInfo the region the procedure was holding 758 */ 759 public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) { 760 wakeRegions(procedure, regionInfo.getTable(), regionInfo); 761 } 762 763 /** 764 * Wake the procedures waiting for the specified regions 765 * @param procedure the procedure that was holding the regions 766 * @param regionInfo the list of regions the procedure was holding 767 */ 768 public void wakeRegions(final Procedure<?> procedure,final TableName table, 769 final RegionInfo... regionInfo) { 770 Arrays.sort(regionInfo, RegionInfo.COMPARATOR); 771 schedLock(); 772 try { 773 int numProcs = 0; 774 final Procedure<?>[] nextProcs = new Procedure[regionInfo.length]; 775 for (int i = 0; i < regionInfo.length; ++i) { 776 assert regionInfo[i].getTable().equals(table); 777 assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i]; 778 779 LockAndQueue regionLock = locking.getRegionLock(regionInfo[i].getEncodedName()); 780 if (regionLock.releaseExclusiveLock(procedure)) { 781 if (!regionLock.isWaitingQueueEmpty()) { 782 // release one procedure at the time since regions has an xlock 783 nextProcs[numProcs++] = regionLock.removeFirst(); 784 } else { 785 locking.removeRegionLock(regionInfo[i].getEncodedName()); 786 } 787 } 788 } 789 790 // awake procedures if any 791 for (int i = numProcs - 1; i >= 0; --i) { 792 wakeProcedure(nextProcs[i]); 793 } 794 wakePollIfNeeded(numProcs); 795 // release the table shared-lock. 796 wakeTableSharedLock(procedure, table); 797 } finally { 798 schedUnlock(); 799 } 800 } 801 802 // ============================================================================ 803 // Namespace Locking Helpers 804 // ============================================================================ 805 /** 806 * Suspend the procedure if the specified namespace is already locked. 807 * @see #wakeNamespaceExclusiveLock(Procedure,String) 808 * @param procedure the procedure trying to acquire the lock 809 * @param namespace Namespace to lock 810 * @return true if the procedure has to wait for the namespace to be available 811 */ 812 public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) { 813 schedLock(); 814 try { 815 final LockAndQueue systemNamespaceTableLock = 816 locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); 817 if (!systemNamespaceTableLock.trySharedLock(procedure)) { 818 waitProcedure(systemNamespaceTableLock, procedure); 819 logLockedResource(LockedResourceType.TABLE, 820 TableName.NAMESPACE_TABLE_NAME.getNameAsString()); 821 return true; 822 } 823 824 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 825 if (!namespaceLock.tryExclusiveLock(procedure)) { 826 systemNamespaceTableLock.releaseSharedLock(); 827 waitProcedure(namespaceLock, procedure); 828 logLockedResource(LockedResourceType.NAMESPACE, namespace); 829 return true; 830 } 831 return false; 832 } finally { 833 schedUnlock(); 834 } 835 } 836 837 /** 838 * Wake the procedures waiting for the specified namespace 839 * @see #waitNamespaceExclusiveLock(Procedure,String) 840 * @param procedure the procedure releasing the lock 841 * @param namespace the namespace that has the exclusive lock 842 */ 843 public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) { 844 schedLock(); 845 try { 846 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 847 final LockAndQueue systemNamespaceTableLock = 848 locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); 849 int waitingCount = 0; 850 if (namespaceLock.releaseExclusiveLock(procedure)) { 851 waitingCount += wakeWaitingProcedures(namespaceLock); 852 } 853 if (systemNamespaceTableLock.releaseSharedLock()) { 854 addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME), 855 () -> procedure + " released namespace exclusive lock"); 856 waitingCount += wakeWaitingProcedures(systemNamespaceTableLock); 857 } 858 wakePollIfNeeded(waitingCount); 859 } finally { 860 schedUnlock(); 861 } 862 } 863 864 // ============================================================================ 865 // Server Locking Helpers 866 // ============================================================================ 867 /** 868 * Try to acquire the exclusive lock on the specified server. 869 * @see #wakeServerExclusiveLock(Procedure,ServerName) 870 * @param procedure the procedure trying to acquire the lock 871 * @param serverName Server to lock 872 * @return true if the procedure has to wait for the server to be available 873 */ 874 public boolean waitServerExclusiveLock(final Procedure<?> procedure, 875 final ServerName serverName) { 876 schedLock(); 877 try { 878 final LockAndQueue lock = locking.getServerLock(serverName); 879 if (lock.tryExclusiveLock(procedure)) { 880 // In tests we may pass procedures other than ServerProcedureInterface, just pass null if 881 // so. 882 removeFromRunQueue(serverRunQueue, 883 getServerQueue(serverName, 884 procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure 885 : null), 886 () -> procedure + " held exclusive lock"); 887 return false; 888 } 889 waitProcedure(lock, procedure); 890 logLockedResource(LockedResourceType.SERVER, serverName.getServerName()); 891 return true; 892 } finally { 893 schedUnlock(); 894 } 895 } 896 897 /** 898 * Wake the procedures waiting for the specified server 899 * @see #waitServerExclusiveLock(Procedure,ServerName) 900 * @param procedure the procedure releasing the lock 901 * @param serverName the server that has the exclusive lock 902 */ 903 public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) { 904 schedLock(); 905 try { 906 final LockAndQueue lock = locking.getServerLock(serverName); 907 // Only SCP will acquire/release server lock so do not need to check the return value here. 908 lock.releaseExclusiveLock(procedure); 909 // In tests we may pass procedures other than ServerProcedureInterface, just pass null if 910 // so. 911 addToRunQueue(serverRunQueue, 912 getServerQueue(serverName, 913 procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure 914 : null), () -> procedure + " released exclusive lock"); 915 int waitingCount = wakeWaitingProcedures(lock); 916 wakePollIfNeeded(waitingCount); 917 } finally { 918 schedUnlock(); 919 } 920 } 921 922 // ============================================================================ 923 // Peer Locking Helpers 924 // ============================================================================ 925 private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) { 926 return proc.getPeerOperationType() != PeerOperationType.REFRESH; 927 } 928 929 /** 930 * Try to acquire the exclusive lock on the specified peer. 931 * @see #wakePeerExclusiveLock(Procedure, String) 932 * @param procedure the procedure trying to acquire the lock 933 * @param peerId peer to lock 934 * @return true if the procedure has to wait for the peer to be available 935 */ 936 public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) { 937 schedLock(); 938 try { 939 final LockAndQueue lock = locking.getPeerLock(peerId); 940 if (lock.tryExclusiveLock(procedure)) { 941 removeFromRunQueue(peerRunQueue, getPeerQueue(peerId), 942 () -> procedure + " held exclusive lock"); 943 return false; 944 } 945 waitProcedure(lock, procedure); 946 logLockedResource(LockedResourceType.PEER, peerId); 947 return true; 948 } finally { 949 schedUnlock(); 950 } 951 } 952 953 /** 954 * Wake the procedures waiting for the specified peer 955 * @see #waitPeerExclusiveLock(Procedure, String) 956 * @param procedure the procedure releasing the lock 957 * @param peerId the peer that has the exclusive lock 958 */ 959 public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) { 960 schedLock(); 961 try { 962 final LockAndQueue lock = locking.getPeerLock(peerId); 963 if (lock.releaseExclusiveLock(procedure)) { 964 addToRunQueue(peerRunQueue, getPeerQueue(peerId), 965 () -> procedure + " released exclusive lock"); 966 int waitingCount = wakeWaitingProcedures(lock); 967 wakePollIfNeeded(waitingCount); 968 } 969 } finally { 970 schedUnlock(); 971 } 972 } 973 974 // ============================================================================ 975 // Meta Locking Helpers 976 // ============================================================================ 977 /** 978 * Try to acquire the exclusive lock on meta. 979 * @see #wakeMetaExclusiveLock(Procedure) 980 * @param procedure the procedure trying to acquire the lock 981 * @return true if the procedure has to wait for meta to be available 982 * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with 983 * {@link RecoverMetaProcedure}. 984 */ 985 @Deprecated 986 public boolean waitMetaExclusiveLock(Procedure<?> procedure) { 987 schedLock(); 988 try { 989 final LockAndQueue lock = locking.getMetaLock(); 990 if (lock.tryExclusiveLock(procedure)) { 991 removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock"); 992 return false; 993 } 994 waitProcedure(lock, procedure); 995 logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString()); 996 return true; 997 } finally { 998 schedUnlock(); 999 } 1000 } 1001 1002 /** 1003 * Wake the procedures waiting for meta. 1004 * @see #waitMetaExclusiveLock(Procedure) 1005 * @param procedure the procedure releasing the lock 1006 * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with 1007 * {@link RecoverMetaProcedure}. 1008 */ 1009 @Deprecated 1010 public void wakeMetaExclusiveLock(Procedure<?> procedure) { 1011 schedLock(); 1012 try { 1013 final LockAndQueue lock = locking.getMetaLock(); 1014 lock.releaseExclusiveLock(procedure); 1015 addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock"); 1016 int waitingCount = wakeWaitingProcedures(lock); 1017 wakePollIfNeeded(waitingCount); 1018 } finally { 1019 schedUnlock(); 1020 } 1021 } 1022 1023 /** 1024 * For debugging. Expensive. 1025 */ 1026 @VisibleForTesting 1027 public String dumpLocks() throws IOException { 1028 schedLock(); 1029 try { 1030 // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter 1031 return this.locking.toString(); 1032 } finally { 1033 schedUnlock(); 1034 } 1035 } 1036}