001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import java.util.function.Function; 024import java.util.function.Supplier; 025import org.apache.hadoop.hbase.ServerName; 026import org.apache.hadoop.hbase.TableExistsException; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.TableNotFoundException; 029import org.apache.hadoop.hbase.client.RegionInfo; 030import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType; 031import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; 032import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; 033import org.apache.hadoop.hbase.procedure2.LockAndQueue; 034import org.apache.hadoop.hbase.procedure2.LockStatus; 035import org.apache.hadoop.hbase.procedure2.LockedResource; 036import org.apache.hadoop.hbase.procedure2.LockedResourceType; 037import org.apache.hadoop.hbase.procedure2.Procedure; 038import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; 039import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator; 040import org.apache.hadoop.hbase.util.AvlUtil.AvlTree; 041import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * ProcedureScheduler for the Master Procedures. This ProcedureScheduler tries to provide to the 048 * ProcedureExecutor procedures that can be executed without having to wait on a lock. Most of the 049 * master operations can be executed concurrently, if they are operating on different tables (e.g. 050 * two create table procedures can be performed at the same time) or against two different servers; 051 * say two servers that crashed at about the same time. 052 * <p> 053 * Each procedure should implement an Interface providing information for this queue. For example 054 * table related procedures should implement TableProcedureInterface. Each procedure will be pushed 055 * in its own queue, and based on the operation type we may make smarter decisions: e.g. we can 056 * abort all the operations preceding a delete table, or similar. 057 * <h4>Concurrency control</h4> Concurrent access to member variables (tableRunQueue, 058 * serverRunQueue, locking, tableMap, serverBuckets) is controlled by schedLock(). This mainly 059 * includes:<br> 060 * <ul> 061 * <li>{@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue when: 062 * <ol> 063 * <li>Queue was empty before push (so must have been out of run-queue)</li> 064 * <li>Child procedure is added (which means parent procedure holds exclusive lock, and it must have 065 * moved Queue out of run-queue)</li> 066 * </ol> 067 * </li> 068 * <li>{@link #poll(long)}: A poll will remove a Queue from run-queue when: 069 * <ol> 070 * <li>Queue becomes empty after poll</li> 071 * <li>Exclusive lock is requested by polled procedure and lock is available (returns the 072 * procedure)</li> 073 * <li>Exclusive lock is requested but lock is not available (returns null)</li> 074 * <li>Polled procedure is child of parent holding exclusive lock and the next procedure is not a 075 * child</li> 076 * </ol> 077 * </li> 078 * <li>Namespace/table/region locks: Queue is added back to run-queue when lock being released is: 079 * <ol> 080 * <li>Exclusive lock</li> 081 * <li>Last shared lock (in case queue was removed because next procedure in queue required 082 * exclusive lock)</li> 083 * </ol> 084 * </li> 085 * </ul> 086 */ 087@InterfaceAudience.Private 088public class MasterProcedureScheduler extends AbstractProcedureScheduler { 089 private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class); 090 091 private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR = 092 (n, k) -> n.compareKey((ServerName) k); 093 private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR = 094 (n, k) -> n.compareKey((TableName) k); 095 private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR = 096 (n, k) -> n.compareKey((String) k); 097 private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR = 098 (n, k) -> n.compareKey((TableName) k); 099 100 private final FairQueue<ServerName> serverRunQueue = new FairQueue<>(); 101 private final FairQueue<TableName> tableRunQueue = new FairQueue<>(); 102 private final FairQueue<String> peerRunQueue = new FairQueue<>(); 103 private final FairQueue<TableName> metaRunQueue = new FairQueue<>(); 104 105 private final ServerQueue[] serverBuckets = new ServerQueue[128]; 106 private TableQueue tableMap = null; 107 private PeerQueue peerMap = null; 108 private MetaQueue metaMap = null; 109 110 private final SchemaLocking locking; 111 112 public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) { 113 locking = new SchemaLocking(procedureRetriever); 114 } 115 116 @Override 117 public void yield(final Procedure proc) { 118 push(proc, false, true); 119 } 120 121 @Override 122 protected void enqueue(final Procedure proc, final boolean addFront) { 123 if (isMetaProcedure(proc)) { 124 doAdd(metaRunQueue, getMetaQueue(), proc, addFront); 125 } else if (isTableProcedure(proc)) { 126 doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); 127 } else if (isServerProcedure(proc)) { 128 ServerProcedureInterface spi = (ServerProcedureInterface) proc; 129 doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront); 130 } else if (isPeerProcedure(proc)) { 131 doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront); 132 } else { 133 // TODO: at the moment we only have Table and Server procedures 134 // if you are implementing a non-table/non-server procedure, you have two options: create 135 // a group for all the non-table/non-server procedures or try to find a key for your 136 // non-table/non-server procedures and implement something similar to the TableRunQueue. 137 throw new UnsupportedOperationException( 138 "RQs for non-table/non-server procedures are not implemented yet: " + proc); 139 } 140 } 141 142 private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue, 143 Procedure<?> proc, boolean addFront) { 144 queue.add(proc, addFront); 145 // For the following conditions, we will put the queue back into execution 146 // 1. The procedure has already held the lock, or the lock has been restored when restarting, 147 // which means it can be executed immediately. 148 // 2. The exclusive lock for this queue has not been held. 149 // 3. The given procedure has the exclusive lock permission for this queue. 150 Supplier<String> reason = null; 151 if (proc.hasLock()) { 152 reason = () -> proc + " has lock"; 153 } else if (proc.isLockedWhenLoading()) { 154 reason = () -> proc + " restores lock when restarting"; 155 } else if (!queue.getLockStatus().hasExclusiveLock()) { 156 reason = () -> "the exclusive lock is not held by anyone when adding " + proc; 157 } else if (queue.getLockStatus().hasLockAccess(proc)) { 158 reason = () -> proc + " has the excusive lock access"; 159 } 160 if (reason != null) { 161 addToRunQueue(fairq, queue, reason); 162 } 163 } 164 165 @Override 166 protected boolean queueHasRunnables() { 167 return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() 168 || serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables(); 169 } 170 171 @Override 172 protected Procedure dequeue() { 173 // meta procedure is always the first priority 174 Procedure<?> pollResult = doPoll(metaRunQueue); 175 // For now, let server handling have precedence over table handling; presumption is that it 176 // is more important handling crashed servers than it is running the 177 // enabling/disabling tables, etc. 178 if (pollResult == null) { 179 pollResult = doPoll(serverRunQueue); 180 } 181 if (pollResult == null) { 182 pollResult = doPoll(peerRunQueue); 183 } 184 if (pollResult == null) { 185 pollResult = doPoll(tableRunQueue); 186 } 187 return pollResult; 188 } 189 190 private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) { 191 LockStatus s = rq.getLockStatus(); 192 // if we have the lock access, we are ready 193 if (s.hasLockAccess(proc)) { 194 return true; 195 } 196 boolean xlockReq = rq.requireExclusiveLock(proc); 197 // if we need to hold the xlock, then we need to make sure that no one holds any lock, including 198 // the shared lock, otherwise, we just need to make sure that no one holds the xlock 199 return xlockReq ? !s.isLocked() : !s.hasExclusiveLock(); 200 } 201 202 private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) { 203 Queue<T> rq = fairq.poll(); 204 if (rq == null || !rq.isAvailable()) { 205 return null; 206 } 207 // loop until we find out a procedure which is ready to run, or if we have checked all the 208 // procedures, then we give up and remove the queue from run queue. 209 for (int i = 0, n = rq.size(); i < n; i++) { 210 Procedure<?> proc = rq.poll(); 211 if (isLockReady(proc, rq)) { 212 // the queue is empty, remove from run queue 213 if (rq.isEmpty()) { 214 removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc); 215 } 216 return proc; 217 } 218 // we are not ready to run, add back and try the next procedure 219 rq.add(proc, false); 220 } 221 // no procedure is ready for execution, remove from run queue 222 removeFromRunQueue(fairq, rq, () -> "no procedure can be executed"); 223 return null; 224 } 225 226 @Override 227 public List<LockedResource> getLocks() { 228 schedLock(); 229 try { 230 return locking.getLocks(); 231 } finally { 232 schedUnlock(); 233 } 234 } 235 236 @Override 237 public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) { 238 schedLock(); 239 try { 240 return locking.getLockResource(resourceType, resourceName); 241 } finally { 242 schedUnlock(); 243 } 244 } 245 246 @Override 247 public void clear() { 248 schedLock(); 249 try { 250 clearQueue(); 251 locking.clear(); 252 } finally { 253 schedUnlock(); 254 } 255 } 256 257 private void clearQueue() { 258 // Remove Servers 259 for (int i = 0; i < serverBuckets.length; ++i) { 260 clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR); 261 serverBuckets[i] = null; 262 } 263 264 // Remove Tables 265 clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); 266 tableMap = null; 267 268 // Remove Peers 269 clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR); 270 peerMap = null; 271 272 assert size() == 0 : "expected queue size to be 0, got " + size(); 273 } 274 275 private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap, 276 FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) { 277 while (treeMap != null) { 278 Queue<T> node = AvlTree.getFirst(treeMap); 279 treeMap = AvlTree.remove(treeMap, node.getKey(), comparator); 280 if (fairq != null) { 281 removeFromRunQueue(fairq, node, () -> "clear all queues"); 282 } 283 } 284 } 285 286 private int queueSize(Queue<?> head) { 287 int count = 0; 288 AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head); 289 while (iter.hasNext()) { 290 count += iter.next().size(); 291 } 292 return count; 293 } 294 295 @Override 296 protected int queueSize() { 297 int count = 0; 298 for (ServerQueue serverMap : serverBuckets) { 299 count += queueSize(serverMap); 300 } 301 count += queueSize(tableMap); 302 count += queueSize(peerMap); 303 count += queueSize(metaMap); 304 return count; 305 } 306 307 @Override 308 public void completionCleanup(final Procedure proc) { 309 if (proc instanceof TableProcedureInterface) { 310 TableProcedureInterface iProcTable = (TableProcedureInterface) proc; 311 boolean tableDeleted; 312 if (proc.hasException()) { 313 Exception procEx = proc.getException().unwrapRemoteException(); 314 if (iProcTable.getTableOperationType() == TableOperationType.CREATE) { 315 // create failed because the table already exist 316 tableDeleted = !(procEx instanceof TableExistsException); 317 } else { 318 // the operation failed because the table does not exist 319 tableDeleted = (procEx instanceof TableNotFoundException); 320 } 321 } else { 322 // the table was deleted 323 tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE); 324 } 325 if (tableDeleted) { 326 markTableAsDeleted(iProcTable.getTableName(), proc); 327 return; 328 } 329 } else if (proc instanceof PeerProcedureInterface) { 330 tryCleanupPeerQueue(getPeerId(proc), proc); 331 } else if (proc instanceof ServerProcedureInterface) { 332 tryCleanupServerQueue(getServerName(proc), proc); 333 } else { 334 // No cleanup for other procedure types, yet. 335 return; 336 } 337 } 338 339 private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue, 340 Supplier<String> reason) { 341 if (LOG.isTraceEnabled()) { 342 LOG.trace("Add {} to run queue because: {}", queue, reason.get()); 343 } 344 if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) { 345 fairq.add(queue); 346 } 347 } 348 349 private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, 350 Queue<T> queue, Supplier<String> reason) { 351 if (LOG.isTraceEnabled()) { 352 LOG.trace("Remove {} from run queue because: {}", queue, reason.get()); 353 } 354 if (AvlIterableList.isLinked(queue)) { 355 fairq.remove(queue); 356 } 357 } 358 359 // ============================================================================ 360 // Table Queue Lookup Helpers 361 // ============================================================================ 362 private TableQueue getTableQueue(TableName tableName) { 363 TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); 364 if (node != null) return node; 365 366 node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName), 367 locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString())); 368 tableMap = AvlTree.insert(tableMap, node); 369 return node; 370 } 371 372 private void removeTableQueue(TableName tableName) { 373 tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); 374 locking.removeTableLock(tableName); 375 } 376 377 private static boolean isTableProcedure(Procedure<?> proc) { 378 return proc instanceof TableProcedureInterface; 379 } 380 381 private static TableName getTableName(Procedure<?> proc) { 382 return ((TableProcedureInterface) proc).getTableName(); 383 } 384 385 // ============================================================================ 386 // Server Queue Lookup Helpers 387 // ============================================================================ 388 private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) { 389 final int index = getBucketIndex(serverBuckets, serverName.hashCode()); 390 ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 391 if (node != null) { 392 return node; 393 } 394 int priority; 395 if (proc != null) { 396 priority = MasterProcedureUtil.getServerPriority(proc); 397 } else { 398 priority = 1; 399 } 400 node = new ServerQueue(serverName, priority, locking.getServerLock(serverName)); 401 serverBuckets[index] = AvlTree.insert(serverBuckets[index], node); 402 return node; 403 } 404 405 private void removeServerQueue(ServerName serverName) { 406 int index = getBucketIndex(serverBuckets, serverName.hashCode()); 407 serverBuckets[index] = 408 AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 409 locking.removeServerLock(serverName); 410 } 411 412 private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) { 413 schedLock(); 414 try { 415 int index = getBucketIndex(serverBuckets, serverName.hashCode()); 416 ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 417 if (node == null) { 418 return; 419 } 420 421 LockAndQueue lock = locking.getServerLock(serverName); 422 if (node.isEmpty() && lock.tryExclusiveLock(proc)) { 423 removeFromRunQueue(serverRunQueue, node, 424 () -> "clean up server queue after " + proc + " completed"); 425 removeServerQueue(serverName); 426 } 427 } finally { 428 schedUnlock(); 429 } 430 } 431 432 private static int getBucketIndex(Object[] buckets, int hashCode) { 433 return Math.abs(hashCode) % buckets.length; 434 } 435 436 private static boolean isServerProcedure(Procedure<?> proc) { 437 return proc instanceof ServerProcedureInterface; 438 } 439 440 private static ServerName getServerName(Procedure<?> proc) { 441 return ((ServerProcedureInterface) proc).getServerName(); 442 } 443 444 // ============================================================================ 445 // Peer Queue Lookup Helpers 446 // ============================================================================ 447 private PeerQueue getPeerQueue(String peerId) { 448 PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 449 if (node != null) { 450 return node; 451 } 452 node = new PeerQueue(peerId, locking.getPeerLock(peerId)); 453 peerMap = AvlTree.insert(peerMap, node); 454 return node; 455 } 456 457 private void removePeerQueue(String peerId) { 458 peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 459 locking.removePeerLock(peerId); 460 } 461 462 private void tryCleanupPeerQueue(String peerId, Procedure procedure) { 463 schedLock(); 464 try { 465 PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 466 if (queue == null) { 467 return; 468 } 469 470 final LockAndQueue lock = locking.getPeerLock(peerId); 471 if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) { 472 removeFromRunQueue(peerRunQueue, queue, 473 () -> "clean up peer queue after " + procedure + " completed"); 474 removePeerQueue(peerId); 475 } 476 } finally { 477 schedUnlock(); 478 } 479 } 480 481 private static boolean isPeerProcedure(Procedure<?> proc) { 482 return proc instanceof PeerProcedureInterface; 483 } 484 485 private static String getPeerId(Procedure<?> proc) { 486 return ((PeerProcedureInterface) proc).getPeerId(); 487 } 488 489 // ============================================================================ 490 // Meta Queue Lookup Helpers 491 // ============================================================================ 492 private MetaQueue getMetaQueue() { 493 MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR); 494 if (node != null) { 495 return node; 496 } 497 node = new MetaQueue(locking.getMetaLock()); 498 metaMap = AvlTree.insert(metaMap, node); 499 return node; 500 } 501 502 private static boolean isMetaProcedure(Procedure<?> proc) { 503 return proc instanceof MetaProcedureInterface; 504 } 505 506 // ============================================================================ 507 // Table Locking Helpers 508 // ============================================================================ 509 /** 510 * Get lock info for a resource of specified type and name and log details 511 */ 512 private void logLockedResource(LockedResourceType resourceType, String resourceName) { 513 if (!LOG.isDebugEnabled()) { 514 return; 515 } 516 517 LockedResource lockedResource = getLockResource(resourceType, resourceName); 518 if (lockedResource != null) { 519 String msg = resourceType.toString() + " '" + resourceName + "', shared lock count=" 520 + lockedResource.getSharedLockCount(); 521 522 Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure(); 523 if (proc != null) { 524 msg += ", exclusively locked by procId=" + proc.getProcId(); 525 } 526 LOG.debug(msg); 527 } 528 } 529 530 /** 531 * Suspend the procedure if the specified table is already locked. Other operations in the 532 * table-queue will be executed after the lock is released. 533 * @param procedure the procedure trying to acquire the lock 534 * @param table Table to lock 535 * @return true if the procedure has to wait for the table to be available 536 */ 537 public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) { 538 schedLock(); 539 try { 540 final String namespace = table.getNamespaceAsString(); 541 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 542 final LockAndQueue tableLock = locking.getTableLock(table); 543 if (!namespaceLock.trySharedLock(procedure)) { 544 waitProcedure(namespaceLock, procedure); 545 logLockedResource(LockedResourceType.NAMESPACE, namespace); 546 return true; 547 } 548 if (!tableLock.tryExclusiveLock(procedure)) { 549 namespaceLock.releaseSharedLock(); 550 waitProcedure(tableLock, procedure); 551 logLockedResource(LockedResourceType.TABLE, table.getNameAsString()); 552 return true; 553 } 554 removeFromRunQueue(tableRunQueue, getTableQueue(table), 555 () -> procedure + " held the exclusive lock"); 556 return false; 557 } finally { 558 schedUnlock(); 559 } 560 } 561 562 /** 563 * Wake the procedures waiting for the specified table 564 * @param procedure the procedure releasing the lock 565 * @param table the name of the table that has the exclusive lock 566 */ 567 public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) { 568 schedLock(); 569 try { 570 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 571 final LockAndQueue tableLock = locking.getTableLock(table); 572 int waitingCount = 0; 573 if (tableLock.releaseExclusiveLock(procedure)) { 574 waitingCount += wakeWaitingProcedures(tableLock); 575 } 576 if (namespaceLock.releaseSharedLock()) { 577 waitingCount += wakeWaitingProcedures(namespaceLock); 578 } 579 addToRunQueue(tableRunQueue, getTableQueue(table), 580 () -> procedure + " released the exclusive lock"); 581 wakePollIfNeeded(waitingCount); 582 } finally { 583 schedUnlock(); 584 } 585 } 586 587 /** 588 * Suspend the procedure if the specified table is already locked. other "read" operations in the 589 * table-queue may be executed concurrently, 590 * @param procedure the procedure trying to acquire the lock 591 * @param table Table to lock 592 * @return true if the procedure has to wait for the table to be available 593 */ 594 public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) { 595 return waitTableQueueSharedLock(procedure, table) == null; 596 } 597 598 private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) { 599 schedLock(); 600 try { 601 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 602 final LockAndQueue tableLock = locking.getTableLock(table); 603 if (!namespaceLock.trySharedLock(procedure)) { 604 waitProcedure(namespaceLock, procedure); 605 return null; 606 } 607 608 if (!tableLock.trySharedLock(procedure)) { 609 namespaceLock.releaseSharedLock(); 610 waitProcedure(tableLock, procedure); 611 return null; 612 } 613 614 return getTableQueue(table); 615 } finally { 616 schedUnlock(); 617 } 618 } 619 620 /** 621 * Wake the procedures waiting for the specified table 622 * @param procedure the procedure releasing the lock 623 * @param table the name of the table that has the shared lock 624 */ 625 public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) { 626 schedLock(); 627 try { 628 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 629 final LockAndQueue tableLock = locking.getTableLock(table); 630 int waitingCount = 0; 631 if (tableLock.releaseSharedLock()) { 632 addToRunQueue(tableRunQueue, getTableQueue(table), 633 () -> procedure + " released the shared lock"); 634 waitingCount += wakeWaitingProcedures(tableLock); 635 } 636 if (namespaceLock.releaseSharedLock()) { 637 waitingCount += wakeWaitingProcedures(namespaceLock); 638 } 639 wakePollIfNeeded(waitingCount); 640 } finally { 641 schedUnlock(); 642 } 643 } 644 645 /** 646 * Tries to remove the queue and the table-lock of the specified table. If there are new 647 * operations pending (e.g. a new create), the remove will not be performed. 648 * @param table the name of the table that should be marked as deleted 649 * @param procedure the procedure that is removing the table 650 * @return true if deletion succeeded, false otherwise meaning that there are other new operations 651 * pending for that table (e.g. a new create). 652 */ 653 boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) { 654 schedLock(); 655 try { 656 final TableQueue queue = getTableQueue(table); 657 final LockAndQueue tableLock = locking.getTableLock(table); 658 if (queue == null) return true; 659 660 if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) { 661 // remove the table from the run-queue and the map 662 if (AvlIterableList.isLinked(queue)) { 663 tableRunQueue.remove(queue); 664 } 665 removeTableQueue(table); 666 } else { 667 // TODO: If there are no create, we can drop all the other ops 668 return false; 669 } 670 } finally { 671 schedUnlock(); 672 } 673 return true; 674 } 675 676 // ============================================================================ 677 // Region Locking Helpers 678 // ============================================================================ 679 /** 680 * Suspend the procedure if the specified region is already locked. 681 * @param procedure the procedure trying to acquire the lock on the region 682 * @param regionInfo the region we are trying to lock 683 * @return true if the procedure has to wait for the regions to be available 684 */ 685 public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) { 686 return waitRegions(procedure, regionInfo.getTable(), regionInfo); 687 } 688 689 /** 690 * Suspend the procedure if the specified set of regions are already locked. 691 * @param procedure the procedure trying to acquire the lock on the regions 692 * @param table the table name of the regions we are trying to lock 693 * @param regionInfos the list of regions we are trying to lock 694 * @return true if the procedure has to wait for the regions to be available 695 */ 696 public boolean waitRegions(final Procedure<?> procedure, final TableName table, 697 final RegionInfo... regionInfos) { 698 Arrays.sort(regionInfos, RegionInfo.COMPARATOR); 699 schedLock(); 700 try { 701 assert table != null; 702 if (waitTableSharedLock(procedure, table)) { 703 return true; 704 } 705 706 // acquire region xlocks or wait 707 boolean hasLock = true; 708 final LockAndQueue[] regionLocks = new LockAndQueue[regionInfos.length]; 709 for (int i = 0; i < regionInfos.length; ++i) { 710 assert regionInfos[i] != null; 711 assert regionInfos[i].getTable() != null; 712 assert regionInfos[i].getTable().equals(table) : regionInfos[i] + " " + procedure; 713 assert i == 0 || regionInfos[i] != regionInfos[i - 1] 714 : "duplicate region: " + regionInfos[i]; 715 716 regionLocks[i] = locking.getRegionLock(regionInfos[i].getEncodedName()); 717 if (!regionLocks[i].tryExclusiveLock(procedure)) { 718 LOG.info("Waiting on xlock for {} held by pid={}", procedure, 719 regionLocks[i].getExclusiveLockProcIdOwner()); 720 waitProcedure(regionLocks[i], procedure); 721 hasLock = false; 722 while (i-- > 0) { 723 regionLocks[i].releaseExclusiveLock(procedure); 724 } 725 break; 726 } else { 727 LOG.info("Took xlock for {}", procedure); 728 } 729 } 730 731 if (!hasLock) { 732 wakeTableSharedLock(procedure, table); 733 } 734 return !hasLock; 735 } finally { 736 schedUnlock(); 737 } 738 } 739 740 /** 741 * Wake the procedures waiting for the specified region 742 * @param procedure the procedure that was holding the region 743 * @param regionInfo the region the procedure was holding 744 */ 745 public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) { 746 wakeRegions(procedure, regionInfo.getTable(), regionInfo); 747 } 748 749 /** 750 * Wake the procedures waiting for the specified regions 751 * @param procedure the procedure that was holding the regions 752 * @param regionInfos the list of regions the procedure was holding 753 */ 754 public void wakeRegions(final Procedure<?> procedure, final TableName table, 755 final RegionInfo... regionInfos) { 756 Arrays.sort(regionInfos, RegionInfo.COMPARATOR); 757 schedLock(); 758 try { 759 int numProcs = 0; 760 final Procedure<?>[] nextProcs = new Procedure[regionInfos.length]; 761 for (int i = 0; i < regionInfos.length; ++i) { 762 assert regionInfos[i].getTable().equals(table); 763 assert i == 0 || regionInfos[i] != regionInfos[i - 1] 764 : "duplicate region: " + regionInfos[i]; 765 766 LockAndQueue regionLock = locking.getRegionLock(regionInfos[i].getEncodedName()); 767 if (regionLock.releaseExclusiveLock(procedure)) { 768 if (!regionLock.isWaitingQueueEmpty()) { 769 // release one procedure at the time since regions has an xlock 770 nextProcs[numProcs++] = regionLock.removeFirst(); 771 } else { 772 locking.removeRegionLock(regionInfos[i].getEncodedName()); 773 } 774 } 775 } 776 777 // awake procedures if any 778 for (int i = numProcs - 1; i >= 0; --i) { 779 wakeProcedure(nextProcs[i]); 780 } 781 wakePollIfNeeded(numProcs); 782 // release the table shared-lock. 783 wakeTableSharedLock(procedure, table); 784 } finally { 785 schedUnlock(); 786 } 787 } 788 789 // ============================================================================ 790 // Namespace Locking Helpers 791 // ============================================================================ 792 /** 793 * Suspend the procedure if the specified namespace is already locked. 794 * @see #wakeNamespaceExclusiveLock(Procedure,String) 795 * @param procedure the procedure trying to acquire the lock 796 * @param namespace Namespace to lock 797 * @return true if the procedure has to wait for the namespace to be available 798 */ 799 public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) { 800 schedLock(); 801 try { 802 final LockAndQueue systemNamespaceTableLock = 803 locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); 804 if (!systemNamespaceTableLock.trySharedLock(procedure)) { 805 waitProcedure(systemNamespaceTableLock, procedure); 806 logLockedResource(LockedResourceType.TABLE, 807 TableName.NAMESPACE_TABLE_NAME.getNameAsString()); 808 return true; 809 } 810 811 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 812 if (!namespaceLock.tryExclusiveLock(procedure)) { 813 systemNamespaceTableLock.releaseSharedLock(); 814 waitProcedure(namespaceLock, procedure); 815 logLockedResource(LockedResourceType.NAMESPACE, namespace); 816 return true; 817 } 818 return false; 819 } finally { 820 schedUnlock(); 821 } 822 } 823 824 /** 825 * Wake the procedures waiting for the specified namespace 826 * @see #waitNamespaceExclusiveLock(Procedure,String) 827 * @param procedure the procedure releasing the lock 828 * @param namespace the namespace that has the exclusive lock 829 */ 830 public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) { 831 schedLock(); 832 try { 833 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 834 final LockAndQueue systemNamespaceTableLock = 835 locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); 836 int waitingCount = 0; 837 if (namespaceLock.releaseExclusiveLock(procedure)) { 838 waitingCount += wakeWaitingProcedures(namespaceLock); 839 } 840 if (systemNamespaceTableLock.releaseSharedLock()) { 841 addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME), 842 () -> procedure + " released namespace exclusive lock"); 843 waitingCount += wakeWaitingProcedures(systemNamespaceTableLock); 844 } 845 wakePollIfNeeded(waitingCount); 846 } finally { 847 schedUnlock(); 848 } 849 } 850 851 // ============================================================================ 852 // Server Locking Helpers 853 // ============================================================================ 854 /** 855 * Try to acquire the exclusive lock on the specified server. 856 * @see #wakeServerExclusiveLock(Procedure,ServerName) 857 * @param procedure the procedure trying to acquire the lock 858 * @param serverName Server to lock 859 * @return true if the procedure has to wait for the server to be available 860 */ 861 public boolean waitServerExclusiveLock(final Procedure<?> procedure, 862 final ServerName serverName) { 863 schedLock(); 864 try { 865 final LockAndQueue lock = locking.getServerLock(serverName); 866 if (lock.tryExclusiveLock(procedure)) { 867 // In tests we may pass procedures other than ServerProcedureInterface, just pass null if 868 // so. 869 removeFromRunQueue(serverRunQueue, 870 getServerQueue(serverName, 871 procedure instanceof ServerProcedureInterface 872 ? (ServerProcedureInterface) procedure 873 : null), 874 () -> procedure + " held exclusive lock"); 875 return false; 876 } 877 waitProcedure(lock, procedure); 878 logLockedResource(LockedResourceType.SERVER, serverName.getServerName()); 879 return true; 880 } finally { 881 schedUnlock(); 882 } 883 } 884 885 /** 886 * Wake the procedures waiting for the specified server 887 * @see #waitServerExclusiveLock(Procedure,ServerName) 888 * @param procedure the procedure releasing the lock 889 * @param serverName the server that has the exclusive lock 890 */ 891 public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) { 892 schedLock(); 893 try { 894 final LockAndQueue lock = locking.getServerLock(serverName); 895 // Only SCP will acquire/release server lock so do not need to check the return value here. 896 lock.releaseExclusiveLock(procedure); 897 // In tests we may pass procedures other than ServerProcedureInterface, just pass null if 898 // so. 899 addToRunQueue(serverRunQueue, 900 getServerQueue(serverName, 901 procedure instanceof ServerProcedureInterface 902 ? (ServerProcedureInterface) procedure 903 : null), 904 () -> procedure + " released exclusive lock"); 905 int waitingCount = wakeWaitingProcedures(lock); 906 wakePollIfNeeded(waitingCount); 907 } finally { 908 schedUnlock(); 909 } 910 } 911 912 // ============================================================================ 913 // Peer Locking Helpers 914 // ============================================================================ 915 private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) { 916 return proc.getPeerOperationType() != PeerOperationType.REFRESH; 917 } 918 919 /** 920 * Try to acquire the exclusive lock on the specified peer. 921 * @see #wakePeerExclusiveLock(Procedure, String) 922 * @param procedure the procedure trying to acquire the lock 923 * @param peerId peer to lock 924 * @return true if the procedure has to wait for the peer to be available 925 */ 926 public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) { 927 schedLock(); 928 try { 929 final LockAndQueue lock = locking.getPeerLock(peerId); 930 if (lock.tryExclusiveLock(procedure)) { 931 removeFromRunQueue(peerRunQueue, getPeerQueue(peerId), 932 () -> procedure + " held exclusive lock"); 933 return false; 934 } 935 waitProcedure(lock, procedure); 936 logLockedResource(LockedResourceType.PEER, peerId); 937 return true; 938 } finally { 939 schedUnlock(); 940 } 941 } 942 943 /** 944 * Wake the procedures waiting for the specified peer 945 * @see #waitPeerExclusiveLock(Procedure, String) 946 * @param procedure the procedure releasing the lock 947 * @param peerId the peer that has the exclusive lock 948 */ 949 public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) { 950 schedLock(); 951 try { 952 final LockAndQueue lock = locking.getPeerLock(peerId); 953 if (lock.releaseExclusiveLock(procedure)) { 954 addToRunQueue(peerRunQueue, getPeerQueue(peerId), 955 () -> procedure + " released exclusive lock"); 956 int waitingCount = wakeWaitingProcedures(lock); 957 wakePollIfNeeded(waitingCount); 958 } 959 } finally { 960 schedUnlock(); 961 } 962 } 963 964 // ============================================================================ 965 // Meta Locking Helpers 966 // ============================================================================ 967 /** 968 * Try to acquire the exclusive lock on meta. 969 * @see #wakeMetaExclusiveLock(Procedure) 970 * @param procedure the procedure trying to acquire the lock 971 * @return true if the procedure has to wait for meta to be available 972 * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with 973 * {@link RecoverMetaProcedure}. 974 */ 975 @Deprecated 976 public boolean waitMetaExclusiveLock(Procedure<?> procedure) { 977 schedLock(); 978 try { 979 final LockAndQueue lock = locking.getMetaLock(); 980 if (lock.tryExclusiveLock(procedure)) { 981 removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock"); 982 return false; 983 } 984 waitProcedure(lock, procedure); 985 logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString()); 986 return true; 987 } finally { 988 schedUnlock(); 989 } 990 } 991 992 /** 993 * Wake the procedures waiting for meta. 994 * @see #waitMetaExclusiveLock(Procedure) 995 * @param procedure the procedure releasing the lock 996 * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with 997 * {@link RecoverMetaProcedure}. 998 */ 999 @Deprecated 1000 public void wakeMetaExclusiveLock(Procedure<?> procedure) { 1001 schedLock(); 1002 try { 1003 final LockAndQueue lock = locking.getMetaLock(); 1004 lock.releaseExclusiveLock(procedure); 1005 addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock"); 1006 int waitingCount = wakeWaitingProcedures(lock); 1007 wakePollIfNeeded(waitingCount); 1008 } finally { 1009 schedUnlock(); 1010 } 1011 } 1012 1013 /** 1014 * For debugging. Expensive. 1015 */ 1016 public String dumpLocks() throws IOException { 1017 schedLock(); 1018 try { 1019 // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter 1020 return this.locking.toString(); 1021 } finally { 1022 schedUnlock(); 1023 } 1024 } 1025}