001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import java.util.function.Function; 024import java.util.function.Supplier; 025import org.apache.hadoop.hbase.ServerName; 026import org.apache.hadoop.hbase.TableExistsException; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.TableNotFoundException; 029import org.apache.hadoop.hbase.client.RegionInfo; 030import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType; 031import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; 032import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; 033import org.apache.hadoop.hbase.procedure2.LockAndQueue; 034import org.apache.hadoop.hbase.procedure2.LockStatus; 035import org.apache.hadoop.hbase.procedure2.LockedResource; 036import org.apache.hadoop.hbase.procedure2.LockedResourceType; 037import org.apache.hadoop.hbase.procedure2.Procedure; 038import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; 039import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator; 040import org.apache.hadoop.hbase.util.AvlUtil.AvlTree; 041import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * ProcedureScheduler for the Master Procedures. 048 * This ProcedureScheduler tries to provide to the ProcedureExecutor procedures 049 * that can be executed without having to wait on a lock. 050 * Most of the master operations can be executed concurrently, if they 051 * are operating on different tables (e.g. two create table procedures can be performed 052 * at the same time) or against two different servers; say two servers that crashed at 053 * about the same time. 054 * 055 * <p>Each procedure should implement an Interface providing information for this queue. 056 * For example table related procedures should implement TableProcedureInterface. 057 * Each procedure will be pushed in its own queue, and based on the operation type 058 * we may make smarter decisions: e.g. we can abort all the operations preceding 059 * a delete table, or similar. 060 * 061 * <h4>Concurrency control</h4> 062 * Concurrent access to member variables (tableRunQueue, serverRunQueue, locking, tableMap, 063 * serverBuckets) is controlled by schedLock(). This mainly includes:<br> 064 * <ul> 065 * <li> 066 * {@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue 067 * when: 068 * <ol> 069 * <li>Queue was empty before push (so must have been out of run-queue)</li> 070 * <li>Child procedure is added (which means parent procedure holds exclusive lock, and it 071 * must have moved Queue out of run-queue)</li> 072 * </ol> 073 * </li> 074 * <li> 075 * {@link #poll(long)}: A poll will remove a Queue from run-queue when: 076 * <ol> 077 * <li>Queue becomes empty after poll</li> 078 * <li>Exclusive lock is requested by polled procedure and lock is available (returns the 079 * procedure)</li> 080 * <li>Exclusive lock is requested but lock is not available (returns null)</li> 081 * <li>Polled procedure is child of parent holding exclusive lock and the next procedure is 082 * not a child</li> 083 * </ol> 084 * </li> 085 * <li> 086 * Namespace/table/region locks: Queue is added back to run-queue when lock being released is: 087 * <ol> 088 * <li>Exclusive lock</li> 089 * <li>Last shared lock (in case queue was removed because next procedure in queue required 090 * exclusive lock)</li> 091 * </ol> 092 * </li> 093 * </ul> 094 */ 095@InterfaceAudience.Private 096public class MasterProcedureScheduler extends AbstractProcedureScheduler { 097 private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class); 098 099 private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR = 100 (n, k) -> n.compareKey((ServerName) k); 101 private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR = 102 (n, k) -> n.compareKey((TableName) k); 103 private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR = 104 (n, k) -> n.compareKey((String) k); 105 private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR = 106 (n, k) -> n.compareKey((TableName) k); 107 108 private final FairQueue<ServerName> serverRunQueue = new FairQueue<>(); 109 private final FairQueue<TableName> tableRunQueue = new FairQueue<>(); 110 private final FairQueue<String> peerRunQueue = new FairQueue<>(); 111 private final FairQueue<TableName> metaRunQueue = new FairQueue<>(); 112 113 private final ServerQueue[] serverBuckets = new ServerQueue[128]; 114 private TableQueue tableMap = null; 115 private PeerQueue peerMap = null; 116 private MetaQueue metaMap = null; 117 118 private final SchemaLocking locking; 119 120 public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) { 121 locking = new SchemaLocking(procedureRetriever); 122 } 123 124 @Override 125 public void yield(final Procedure proc) { 126 push(proc, false, true); 127 } 128 129 @Override 130 protected void enqueue(final Procedure proc, final boolean addFront) { 131 if (isMetaProcedure(proc)) { 132 doAdd(metaRunQueue, getMetaQueue(), proc, addFront); 133 } else if (isTableProcedure(proc)) { 134 doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); 135 } else if (isServerProcedure(proc)) { 136 ServerProcedureInterface spi = (ServerProcedureInterface) proc; 137 doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront); 138 } else if (isPeerProcedure(proc)) { 139 doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront); 140 } else { 141 // TODO: at the moment we only have Table and Server procedures 142 // if you are implementing a non-table/non-server procedure, you have two options: create 143 // a group for all the non-table/non-server procedures or try to find a key for your 144 // non-table/non-server procedures and implement something similar to the TableRunQueue. 145 throw new UnsupportedOperationException( 146 "RQs for non-table/non-server procedures are not implemented yet: " + proc); 147 } 148 } 149 150 private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue, 151 Procedure<?> proc, boolean addFront) { 152 queue.add(proc, addFront); 153 // For the following conditions, we will put the queue back into execution 154 // 1. The procedure has already held the lock, or the lock has been restored when restarting, 155 // which means it can be executed immediately. 156 // 2. The exclusive lock for this queue has not been held. 157 // 3. The given procedure has the exclusive lock permission for this queue. 158 Supplier<String> reason = null; 159 if (proc.hasLock()) { 160 reason = () -> proc + " has lock"; 161 } else if (proc.isLockedWhenLoading()) { 162 reason = () -> proc + " restores lock when restarting"; 163 } else if (!queue.getLockStatus().hasExclusiveLock()) { 164 reason = () -> "the exclusive lock is not held by anyone when adding " + proc; 165 } else if (queue.getLockStatus().hasLockAccess(proc)) { 166 reason = () -> proc + " has the excusive lock access"; 167 } 168 if (reason != null) { 169 addToRunQueue(fairq, queue, reason); 170 } 171 } 172 173 @Override 174 protected boolean queueHasRunnables() { 175 return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() || 176 serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables(); 177 } 178 179 @Override 180 protected Procedure dequeue() { 181 // meta procedure is always the first priority 182 Procedure<?> pollResult = doPoll(metaRunQueue); 183 // For now, let server handling have precedence over table handling; presumption is that it 184 // is more important handling crashed servers than it is running the 185 // enabling/disabling tables, etc. 186 if (pollResult == null) { 187 pollResult = doPoll(serverRunQueue); 188 } 189 if (pollResult == null) { 190 pollResult = doPoll(peerRunQueue); 191 } 192 if (pollResult == null) { 193 pollResult = doPoll(tableRunQueue); 194 } 195 return pollResult; 196 } 197 198 private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) { 199 LockStatus s = rq.getLockStatus(); 200 // if we have the lock access, we are ready 201 if (s.hasLockAccess(proc)) { 202 return true; 203 } 204 boolean xlockReq = rq.requireExclusiveLock(proc); 205 // if we need to hold the xlock, then we need to make sure that no one holds any lock, including 206 // the shared lock, otherwise, we just need to make sure that no one holds the xlock 207 return xlockReq ? !s.isLocked() : !s.hasExclusiveLock(); 208 } 209 210 private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) { 211 Queue<T> rq = fairq.poll(); 212 if (rq == null || !rq.isAvailable()) { 213 return null; 214 } 215 // loop until we find out a procedure which is ready to run, or if we have checked all the 216 // procedures, then we give up and remove the queue from run queue. 217 for (int i = 0, n = rq.size(); i < n; i++) { 218 Procedure<?> proc = rq.poll(); 219 if (isLockReady(proc, rq)) { 220 // the queue is empty, remove from run queue 221 if (rq.isEmpty()) { 222 removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc); 223 } 224 return proc; 225 } 226 // we are not ready to run, add back and try the next procedure 227 rq.add(proc, false); 228 } 229 // no procedure is ready for execution, remove from run queue 230 removeFromRunQueue(fairq, rq, () -> "no procedure can be executed"); 231 return null; 232 } 233 234 @Override 235 public List<LockedResource> getLocks() { 236 schedLock(); 237 try { 238 return locking.getLocks(); 239 } finally { 240 schedUnlock(); 241 } 242 } 243 244 @Override 245 public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) { 246 schedLock(); 247 try { 248 return locking.getLockResource(resourceType, resourceName); 249 } finally { 250 schedUnlock(); 251 } 252 } 253 254 @Override 255 public void clear() { 256 schedLock(); 257 try { 258 clearQueue(); 259 locking.clear(); 260 } finally { 261 schedUnlock(); 262 } 263 } 264 265 private void clearQueue() { 266 // Remove Servers 267 for (int i = 0; i < serverBuckets.length; ++i) { 268 clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR); 269 serverBuckets[i] = null; 270 } 271 272 // Remove Tables 273 clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); 274 tableMap = null; 275 276 // Remove Peers 277 clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR); 278 peerMap = null; 279 280 assert size() == 0 : "expected queue size to be 0, got " + size(); 281 } 282 283 private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap, 284 FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) { 285 while (treeMap != null) { 286 Queue<T> node = AvlTree.getFirst(treeMap); 287 treeMap = AvlTree.remove(treeMap, node.getKey(), comparator); 288 if (fairq != null) { 289 removeFromRunQueue(fairq, node, () -> "clear all queues"); 290 } 291 } 292 } 293 294 private int queueSize(Queue<?> head) { 295 int count = 0; 296 AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head); 297 while (iter.hasNext()) { 298 count += iter.next().size(); 299 } 300 return count; 301 } 302 303 @Override 304 protected int queueSize() { 305 int count = 0; 306 for (ServerQueue serverMap : serverBuckets) { 307 count += queueSize(serverMap); 308 } 309 count += queueSize(tableMap); 310 count += queueSize(peerMap); 311 count += queueSize(metaMap); 312 return count; 313 } 314 315 @Override 316 public void completionCleanup(final Procedure proc) { 317 if (proc instanceof TableProcedureInterface) { 318 TableProcedureInterface iProcTable = (TableProcedureInterface) proc; 319 boolean tableDeleted; 320 if (proc.hasException()) { 321 Exception procEx = proc.getException().unwrapRemoteException(); 322 if (iProcTable.getTableOperationType() == TableOperationType.CREATE) { 323 // create failed because the table already exist 324 tableDeleted = !(procEx instanceof TableExistsException); 325 } else { 326 // the operation failed because the table does not exist 327 tableDeleted = (procEx instanceof TableNotFoundException); 328 } 329 } else { 330 // the table was deleted 331 tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE); 332 } 333 if (tableDeleted) { 334 markTableAsDeleted(iProcTable.getTableName(), proc); 335 return; 336 } 337 } else if (proc instanceof PeerProcedureInterface) { 338 tryCleanupPeerQueue(getPeerId(proc), proc); 339 } else if (proc instanceof ServerProcedureInterface) { 340 tryCleanupServerQueue(getServerName(proc), proc); 341 } else { 342 // No cleanup for other procedure types, yet. 343 return; 344 } 345 } 346 347 private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue, 348 Supplier<String> reason) { 349 if (LOG.isTraceEnabled()) { 350 LOG.trace("Add {} to run queue because: {}", queue, reason.get()); 351 } 352 if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) { 353 fairq.add(queue); 354 } 355 } 356 357 private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, 358 Queue<T> queue, Supplier<String> reason) { 359 if (LOG.isTraceEnabled()) { 360 LOG.trace("Remove {} from run queue because: {}", queue, reason.get()); 361 } 362 if (AvlIterableList.isLinked(queue)) { 363 fairq.remove(queue); 364 } 365 } 366 367 // ============================================================================ 368 // Table Queue Lookup Helpers 369 // ============================================================================ 370 private TableQueue getTableQueue(TableName tableName) { 371 TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); 372 if (node != null) return node; 373 374 node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName), 375 locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString())); 376 tableMap = AvlTree.insert(tableMap, node); 377 return node; 378 } 379 380 private void removeTableQueue(TableName tableName) { 381 tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); 382 locking.removeTableLock(tableName); 383 } 384 385 private static boolean isTableProcedure(Procedure<?> proc) { 386 return proc instanceof TableProcedureInterface; 387 } 388 389 private static TableName getTableName(Procedure<?> proc) { 390 return ((TableProcedureInterface)proc).getTableName(); 391 } 392 393 // ============================================================================ 394 // Server Queue Lookup Helpers 395 // ============================================================================ 396 private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) { 397 final int index = getBucketIndex(serverBuckets, serverName.hashCode()); 398 ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 399 if (node != null) { 400 return node; 401 } 402 int priority; 403 if (proc != null) { 404 priority = MasterProcedureUtil.getServerPriority(proc); 405 } else { 406 priority = 1; 407 } 408 node = new ServerQueue(serverName, priority, locking.getServerLock(serverName)); 409 serverBuckets[index] = AvlTree.insert(serverBuckets[index], node); 410 return node; 411 } 412 413 private void removeServerQueue(ServerName serverName) { 414 int index = getBucketIndex(serverBuckets, serverName.hashCode()); 415 serverBuckets[index] = 416 AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 417 locking.removeServerLock(serverName); 418 } 419 420 private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) { 421 schedLock(); 422 try { 423 int index = getBucketIndex(serverBuckets, serverName.hashCode()); 424 ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 425 if (node == null) { 426 return; 427 } 428 429 LockAndQueue lock = locking.getServerLock(serverName); 430 if (node.isEmpty() && lock.tryExclusiveLock(proc)) { 431 removeFromRunQueue(serverRunQueue, node, 432 () -> "clean up server queue after " + proc + " completed"); 433 removeServerQueue(serverName); 434 } 435 } finally { 436 schedUnlock(); 437 } 438 } 439 440 private static int getBucketIndex(Object[] buckets, int hashCode) { 441 return Math.abs(hashCode) % buckets.length; 442 } 443 444 private static boolean isServerProcedure(Procedure<?> proc) { 445 return proc instanceof ServerProcedureInterface; 446 } 447 448 private static ServerName getServerName(Procedure<?> proc) { 449 return ((ServerProcedureInterface)proc).getServerName(); 450 } 451 452 // ============================================================================ 453 // Peer Queue Lookup Helpers 454 // ============================================================================ 455 private PeerQueue getPeerQueue(String peerId) { 456 PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 457 if (node != null) { 458 return node; 459 } 460 node = new PeerQueue(peerId, locking.getPeerLock(peerId)); 461 peerMap = AvlTree.insert(peerMap, node); 462 return node; 463 } 464 465 private void removePeerQueue(String peerId) { 466 peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 467 locking.removePeerLock(peerId); 468 } 469 470 private void tryCleanupPeerQueue(String peerId, Procedure procedure) { 471 schedLock(); 472 try { 473 PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 474 if (queue == null) { 475 return; 476 } 477 478 final LockAndQueue lock = locking.getPeerLock(peerId); 479 if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) { 480 removeFromRunQueue(peerRunQueue, queue, 481 () -> "clean up peer queue after " + procedure + " completed"); 482 removePeerQueue(peerId); 483 } 484 } finally { 485 schedUnlock(); 486 } 487 } 488 489 private static boolean isPeerProcedure(Procedure<?> proc) { 490 return proc instanceof PeerProcedureInterface; 491 } 492 493 private static String getPeerId(Procedure<?> proc) { 494 return ((PeerProcedureInterface) proc).getPeerId(); 495 } 496 497 // ============================================================================ 498 // Meta Queue Lookup Helpers 499 // ============================================================================ 500 private MetaQueue getMetaQueue() { 501 MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR); 502 if (node != null) { 503 return node; 504 } 505 node = new MetaQueue(locking.getMetaLock()); 506 metaMap = AvlTree.insert(metaMap, node); 507 return node; 508 } 509 510 private static boolean isMetaProcedure(Procedure<?> proc) { 511 return proc instanceof MetaProcedureInterface; 512 } 513 // ============================================================================ 514 // Table Locking Helpers 515 // ============================================================================ 516 /** 517 * Get lock info for a resource of specified type and name and log details 518 */ 519 private void logLockedResource(LockedResourceType resourceType, String resourceName) { 520 if (!LOG.isDebugEnabled()) { 521 return; 522 } 523 524 LockedResource lockedResource = getLockResource(resourceType, resourceName); 525 if (lockedResource != null) { 526 String msg = resourceType.toString() + " '" + resourceName + "', shared lock count=" + 527 lockedResource.getSharedLockCount(); 528 529 Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure(); 530 if (proc != null) { 531 msg += ", exclusively locked by procId=" + proc.getProcId(); 532 } 533 LOG.debug(msg); 534 } 535 } 536 537 /** 538 * Suspend the procedure if the specified table is already locked. 539 * Other operations in the table-queue will be executed after the lock is released. 540 * @param procedure the procedure trying to acquire the lock 541 * @param table Table to lock 542 * @return true if the procedure has to wait for the table to be available 543 */ 544 public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) { 545 schedLock(); 546 try { 547 final String namespace = table.getNamespaceAsString(); 548 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 549 final LockAndQueue tableLock = locking.getTableLock(table); 550 if (!namespaceLock.trySharedLock(procedure)) { 551 waitProcedure(namespaceLock, procedure); 552 logLockedResource(LockedResourceType.NAMESPACE, namespace); 553 return true; 554 } 555 if (!tableLock.tryExclusiveLock(procedure)) { 556 namespaceLock.releaseSharedLock(); 557 waitProcedure(tableLock, procedure); 558 logLockedResource(LockedResourceType.TABLE, table.getNameAsString()); 559 return true; 560 } 561 removeFromRunQueue(tableRunQueue, getTableQueue(table), 562 () -> procedure + " held the exclusive lock"); 563 return false; 564 } finally { 565 schedUnlock(); 566 } 567 } 568 569 /** 570 * Wake the procedures waiting for the specified table 571 * @param procedure the procedure releasing the lock 572 * @param table the name of the table that has the exclusive lock 573 */ 574 public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) { 575 schedLock(); 576 try { 577 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 578 final LockAndQueue tableLock = locking.getTableLock(table); 579 int waitingCount = 0; 580 if (tableLock.releaseExclusiveLock(procedure)) { 581 waitingCount += wakeWaitingProcedures(tableLock); 582 } 583 if (namespaceLock.releaseSharedLock()) { 584 waitingCount += wakeWaitingProcedures(namespaceLock); 585 } 586 addToRunQueue(tableRunQueue, getTableQueue(table), 587 () -> procedure + " released the exclusive lock"); 588 wakePollIfNeeded(waitingCount); 589 } finally { 590 schedUnlock(); 591 } 592 } 593 594 /** 595 * Suspend the procedure if the specified table is already locked. 596 * other "read" operations in the table-queue may be executed concurrently, 597 * @param procedure the procedure trying to acquire the lock 598 * @param table Table to lock 599 * @return true if the procedure has to wait for the table to be available 600 */ 601 public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) { 602 return waitTableQueueSharedLock(procedure, table) == null; 603 } 604 605 private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) { 606 schedLock(); 607 try { 608 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 609 final LockAndQueue tableLock = locking.getTableLock(table); 610 if (!namespaceLock.trySharedLock(procedure)) { 611 waitProcedure(namespaceLock, procedure); 612 return null; 613 } 614 615 if (!tableLock.trySharedLock(procedure)) { 616 namespaceLock.releaseSharedLock(); 617 waitProcedure(tableLock, procedure); 618 return null; 619 } 620 621 return getTableQueue(table); 622 } finally { 623 schedUnlock(); 624 } 625 } 626 627 /** 628 * Wake the procedures waiting for the specified table 629 * @param procedure the procedure releasing the lock 630 * @param table the name of the table that has the shared lock 631 */ 632 public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) { 633 schedLock(); 634 try { 635 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 636 final LockAndQueue tableLock = locking.getTableLock(table); 637 int waitingCount = 0; 638 if (tableLock.releaseSharedLock()) { 639 addToRunQueue(tableRunQueue, getTableQueue(table), 640 () -> procedure + " released the shared lock"); 641 waitingCount += wakeWaitingProcedures(tableLock); 642 } 643 if (namespaceLock.releaseSharedLock()) { 644 waitingCount += wakeWaitingProcedures(namespaceLock); 645 } 646 wakePollIfNeeded(waitingCount); 647 } finally { 648 schedUnlock(); 649 } 650 } 651 652 /** 653 * Tries to remove the queue and the table-lock of the specified table. 654 * If there are new operations pending (e.g. a new create), 655 * the remove will not be performed. 656 * @param table the name of the table that should be marked as deleted 657 * @param procedure the procedure that is removing the table 658 * @return true if deletion succeeded, false otherwise meaning that there are 659 * other new operations pending for that table (e.g. a new create). 660 */ 661 boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) { 662 schedLock(); 663 try { 664 final TableQueue queue = getTableQueue(table); 665 final LockAndQueue tableLock = locking.getTableLock(table); 666 if (queue == null) return true; 667 668 if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) { 669 // remove the table from the run-queue and the map 670 if (AvlIterableList.isLinked(queue)) { 671 tableRunQueue.remove(queue); 672 } 673 removeTableQueue(table); 674 } else { 675 // TODO: If there are no create, we can drop all the other ops 676 return false; 677 } 678 } finally { 679 schedUnlock(); 680 } 681 return true; 682 } 683 684 // ============================================================================ 685 // Region Locking Helpers 686 // ============================================================================ 687 /** 688 * Suspend the procedure if the specified region is already locked. 689 * @param procedure the procedure trying to acquire the lock on the region 690 * @param regionInfo the region we are trying to lock 691 * @return true if the procedure has to wait for the regions to be available 692 */ 693 public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) { 694 return waitRegions(procedure, regionInfo.getTable(), regionInfo); 695 } 696 697 /** 698 * Suspend the procedure if the specified set of regions are already locked. 699 * @param procedure the procedure trying to acquire the lock on the regions 700 * @param table the table name of the regions we are trying to lock 701 * @param regionInfos the list of regions we are trying to lock 702 * @return true if the procedure has to wait for the regions to be available 703 */ 704 public boolean waitRegions(final Procedure<?> procedure, final TableName table, 705 final RegionInfo... regionInfos) { 706 Arrays.sort(regionInfos, RegionInfo.COMPARATOR); 707 schedLock(); 708 try { 709 assert table != null; 710 if (waitTableSharedLock(procedure, table)) { 711 return true; 712 } 713 714 // acquire region xlocks or wait 715 boolean hasLock = true; 716 final LockAndQueue[] regionLocks = new LockAndQueue[regionInfos.length]; 717 for (int i = 0; i < regionInfos.length; ++i) { 718 assert regionInfos[i] != null; 719 assert regionInfos[i].getTable() != null; 720 assert regionInfos[i].getTable().equals(table): regionInfos[i] + " " + procedure; 721 assert i == 0 || regionInfos[i] != regionInfos[i - 1] : "duplicate region: " + regionInfos[i]; 722 723 regionLocks[i] = locking.getRegionLock(regionInfos[i].getEncodedName()); 724 if (!regionLocks[i].tryExclusiveLock(procedure)) { 725 LOG.info("Waiting on xlock for {} held by pid={}", procedure, 726 regionLocks[i].getExclusiveLockProcIdOwner()); 727 waitProcedure(regionLocks[i], procedure); 728 hasLock = false; 729 while (i-- > 0) { 730 regionLocks[i].releaseExclusiveLock(procedure); 731 } 732 break; 733 } else { 734 LOG.info("Took xlock for {}", procedure); 735 } 736 } 737 738 if (!hasLock) { 739 wakeTableSharedLock(procedure, table); 740 } 741 return !hasLock; 742 } finally { 743 schedUnlock(); 744 } 745 } 746 747 /** 748 * Wake the procedures waiting for the specified region 749 * @param procedure the procedure that was holding the region 750 * @param regionInfo the region the procedure was holding 751 */ 752 public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) { 753 wakeRegions(procedure, regionInfo.getTable(), regionInfo); 754 } 755 756 /** 757 * Wake the procedures waiting for the specified regions 758 * @param procedure the procedure that was holding the regions 759 * @param regionInfos the list of regions the procedure was holding 760 */ 761 public void wakeRegions(final Procedure<?> procedure,final TableName table, 762 final RegionInfo... regionInfos) { 763 Arrays.sort(regionInfos, RegionInfo.COMPARATOR); 764 schedLock(); 765 try { 766 int numProcs = 0; 767 final Procedure<?>[] nextProcs = new Procedure[regionInfos.length]; 768 for (int i = 0; i < regionInfos.length; ++i) { 769 assert regionInfos[i].getTable().equals(table); 770 assert i == 0 || regionInfos[i] != regionInfos[i - 1] : "duplicate region: " + regionInfos[i]; 771 772 LockAndQueue regionLock = locking.getRegionLock(regionInfos[i].getEncodedName()); 773 if (regionLock.releaseExclusiveLock(procedure)) { 774 if (!regionLock.isWaitingQueueEmpty()) { 775 // release one procedure at the time since regions has an xlock 776 nextProcs[numProcs++] = regionLock.removeFirst(); 777 } else { 778 locking.removeRegionLock(regionInfos[i].getEncodedName()); 779 } 780 } 781 } 782 783 // awake procedures if any 784 for (int i = numProcs - 1; i >= 0; --i) { 785 wakeProcedure(nextProcs[i]); 786 } 787 wakePollIfNeeded(numProcs); 788 // release the table shared-lock. 789 wakeTableSharedLock(procedure, table); 790 } finally { 791 schedUnlock(); 792 } 793 } 794 795 // ============================================================================ 796 // Namespace Locking Helpers 797 // ============================================================================ 798 /** 799 * Suspend the procedure if the specified namespace is already locked. 800 * @see #wakeNamespaceExclusiveLock(Procedure,String) 801 * @param procedure the procedure trying to acquire the lock 802 * @param namespace Namespace to lock 803 * @return true if the procedure has to wait for the namespace to be available 804 */ 805 public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) { 806 schedLock(); 807 try { 808 final LockAndQueue systemNamespaceTableLock = 809 locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); 810 if (!systemNamespaceTableLock.trySharedLock(procedure)) { 811 waitProcedure(systemNamespaceTableLock, procedure); 812 logLockedResource(LockedResourceType.TABLE, 813 TableName.NAMESPACE_TABLE_NAME.getNameAsString()); 814 return true; 815 } 816 817 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 818 if (!namespaceLock.tryExclusiveLock(procedure)) { 819 systemNamespaceTableLock.releaseSharedLock(); 820 waitProcedure(namespaceLock, procedure); 821 logLockedResource(LockedResourceType.NAMESPACE, namespace); 822 return true; 823 } 824 return false; 825 } finally { 826 schedUnlock(); 827 } 828 } 829 830 /** 831 * Wake the procedures waiting for the specified namespace 832 * @see #waitNamespaceExclusiveLock(Procedure,String) 833 * @param procedure the procedure releasing the lock 834 * @param namespace the namespace that has the exclusive lock 835 */ 836 public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) { 837 schedLock(); 838 try { 839 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 840 final LockAndQueue systemNamespaceTableLock = 841 locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); 842 int waitingCount = 0; 843 if (namespaceLock.releaseExclusiveLock(procedure)) { 844 waitingCount += wakeWaitingProcedures(namespaceLock); 845 } 846 if (systemNamespaceTableLock.releaseSharedLock()) { 847 addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME), 848 () -> procedure + " released namespace exclusive lock"); 849 waitingCount += wakeWaitingProcedures(systemNamespaceTableLock); 850 } 851 wakePollIfNeeded(waitingCount); 852 } finally { 853 schedUnlock(); 854 } 855 } 856 857 // ============================================================================ 858 // Server Locking Helpers 859 // ============================================================================ 860 /** 861 * Try to acquire the exclusive lock on the specified server. 862 * @see #wakeServerExclusiveLock(Procedure,ServerName) 863 * @param procedure the procedure trying to acquire the lock 864 * @param serverName Server to lock 865 * @return true if the procedure has to wait for the server to be available 866 */ 867 public boolean waitServerExclusiveLock(final Procedure<?> procedure, 868 final ServerName serverName) { 869 schedLock(); 870 try { 871 final LockAndQueue lock = locking.getServerLock(serverName); 872 if (lock.tryExclusiveLock(procedure)) { 873 // In tests we may pass procedures other than ServerProcedureInterface, just pass null if 874 // so. 875 removeFromRunQueue(serverRunQueue, 876 getServerQueue(serverName, 877 procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure 878 : null), 879 () -> procedure + " held exclusive lock"); 880 return false; 881 } 882 waitProcedure(lock, procedure); 883 logLockedResource(LockedResourceType.SERVER, serverName.getServerName()); 884 return true; 885 } finally { 886 schedUnlock(); 887 } 888 } 889 890 /** 891 * Wake the procedures waiting for the specified server 892 * @see #waitServerExclusiveLock(Procedure,ServerName) 893 * @param procedure the procedure releasing the lock 894 * @param serverName the server that has the exclusive lock 895 */ 896 public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) { 897 schedLock(); 898 try { 899 final LockAndQueue lock = locking.getServerLock(serverName); 900 // Only SCP will acquire/release server lock so do not need to check the return value here. 901 lock.releaseExclusiveLock(procedure); 902 // In tests we may pass procedures other than ServerProcedureInterface, just pass null if 903 // so. 904 addToRunQueue(serverRunQueue, 905 getServerQueue(serverName, 906 procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure 907 : null), () -> procedure + " released exclusive lock"); 908 int waitingCount = wakeWaitingProcedures(lock); 909 wakePollIfNeeded(waitingCount); 910 } finally { 911 schedUnlock(); 912 } 913 } 914 915 // ============================================================================ 916 // Peer Locking Helpers 917 // ============================================================================ 918 private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) { 919 return proc.getPeerOperationType() != PeerOperationType.REFRESH; 920 } 921 922 /** 923 * Try to acquire the exclusive lock on the specified peer. 924 * @see #wakePeerExclusiveLock(Procedure, String) 925 * @param procedure the procedure trying to acquire the lock 926 * @param peerId peer to lock 927 * @return true if the procedure has to wait for the peer to be available 928 */ 929 public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) { 930 schedLock(); 931 try { 932 final LockAndQueue lock = locking.getPeerLock(peerId); 933 if (lock.tryExclusiveLock(procedure)) { 934 removeFromRunQueue(peerRunQueue, getPeerQueue(peerId), 935 () -> procedure + " held exclusive lock"); 936 return false; 937 } 938 waitProcedure(lock, procedure); 939 logLockedResource(LockedResourceType.PEER, peerId); 940 return true; 941 } finally { 942 schedUnlock(); 943 } 944 } 945 946 /** 947 * Wake the procedures waiting for the specified peer 948 * @see #waitPeerExclusiveLock(Procedure, String) 949 * @param procedure the procedure releasing the lock 950 * @param peerId the peer that has the exclusive lock 951 */ 952 public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) { 953 schedLock(); 954 try { 955 final LockAndQueue lock = locking.getPeerLock(peerId); 956 if (lock.releaseExclusiveLock(procedure)) { 957 addToRunQueue(peerRunQueue, getPeerQueue(peerId), 958 () -> procedure + " released exclusive lock"); 959 int waitingCount = wakeWaitingProcedures(lock); 960 wakePollIfNeeded(waitingCount); 961 } 962 } finally { 963 schedUnlock(); 964 } 965 } 966 967 // ============================================================================ 968 // Meta Locking Helpers 969 // ============================================================================ 970 /** 971 * Try to acquire the exclusive lock on meta. 972 * @see #wakeMetaExclusiveLock(Procedure) 973 * @param procedure the procedure trying to acquire the lock 974 * @return true if the procedure has to wait for meta to be available 975 * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with 976 * {@link RecoverMetaProcedure}. 977 */ 978 @Deprecated 979 public boolean waitMetaExclusiveLock(Procedure<?> procedure) { 980 schedLock(); 981 try { 982 final LockAndQueue lock = locking.getMetaLock(); 983 if (lock.tryExclusiveLock(procedure)) { 984 removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock"); 985 return false; 986 } 987 waitProcedure(lock, procedure); 988 logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString()); 989 return true; 990 } finally { 991 schedUnlock(); 992 } 993 } 994 995 /** 996 * Wake the procedures waiting for meta. 997 * @see #waitMetaExclusiveLock(Procedure) 998 * @param procedure the procedure releasing the lock 999 * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with 1000 * {@link RecoverMetaProcedure}. 1001 */ 1002 @Deprecated 1003 public void wakeMetaExclusiveLock(Procedure<?> procedure) { 1004 schedLock(); 1005 try { 1006 final LockAndQueue lock = locking.getMetaLock(); 1007 lock.releaseExclusiveLock(procedure); 1008 addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock"); 1009 int waitingCount = wakeWaitingProcedures(lock); 1010 wakePollIfNeeded(waitingCount); 1011 } finally { 1012 schedUnlock(); 1013 } 1014 } 1015 1016 /** 1017 * For debugging. Expensive. 1018 */ 1019 public String dumpLocks() throws IOException { 1020 schedLock(); 1021 try { 1022 // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter 1023 return this.locking.toString(); 1024 } finally { 1025 schedUnlock(); 1026 } 1027 } 1028}