001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import java.util.function.Function; 024import java.util.function.Supplier; 025import org.apache.commons.lang3.builder.ToStringBuilder; 026import org.apache.commons.lang3.builder.ToStringStyle; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.TableExistsException; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.TableNotFoundException; 032import org.apache.hadoop.hbase.client.RegionInfo; 033import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; 034import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; 035import org.apache.hadoop.hbase.procedure2.LockAndQueue; 036import org.apache.hadoop.hbase.procedure2.LockStatus; 037import org.apache.hadoop.hbase.procedure2.LockedResource; 038import org.apache.hadoop.hbase.procedure2.LockedResourceType; 039import org.apache.hadoop.hbase.procedure2.Procedure; 040import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; 041import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator; 042import org.apache.hadoop.hbase.util.AvlUtil.AvlTree; 043import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * ProcedureScheduler for the Master Procedures. This ProcedureScheduler tries to provide to the 050 * ProcedureExecutor procedures that can be executed without having to wait on a lock. Most of the 051 * master operations can be executed concurrently, if they are operating on different tables (e.g. 052 * two create table procedures can be performed at the same time) or against two different servers; 053 * say two servers that crashed at about the same time. 054 * <p> 055 * Each procedure should implement an Interface providing information for this queue. For example 056 * table related procedures should implement TableProcedureInterface. Each procedure will be pushed 057 * in its own queue, and based on the operation type we may make smarter decisions: e.g. we can 058 * abort all the operations preceding a delete table, or similar. 059 * <h4>Concurrency control</h4> Concurrent access to member variables (tableRunQueue, 060 * serverRunQueue, locking, tableMap, serverBuckets) is controlled by schedLock(). This mainly 061 * includes:<br> 062 * <ul> 063 * <li>{@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue when: 064 * <ol> 065 * <li>Queue was empty before push (so must have been out of run-queue)</li> 066 * <li>Child procedure is added (which means parent procedure holds exclusive lock, and it must have 067 * moved Queue out of run-queue)</li> 068 * </ol> 069 * </li> 070 * <li>{@link #poll(long)}: A poll will remove a Queue from run-queue when: 071 * <ol> 072 * <li>Queue becomes empty after poll</li> 073 * <li>Exclusive lock is requested by polled procedure and lock is available (returns the 074 * procedure)</li> 075 * <li>Exclusive lock is requested but lock is not available (returns null)</li> 076 * <li>Polled procedure is child of parent holding exclusive lock and the next procedure is not a 077 * child</li> 078 * </ol> 079 * </li> 080 * <li>Namespace/table/region locks: Queue is added back to run-queue when lock being released is: 081 * <ol> 082 * <li>Exclusive lock</li> 083 * <li>Last shared lock (in case queue was removed because next procedure in queue required 084 * exclusive lock)</li> 085 * </ol> 086 * </li> 087 * </ul> 088 */ 089@InterfaceAudience.Private 090public class MasterProcedureScheduler extends AbstractProcedureScheduler { 091 private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class); 092 093 private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR = 094 (n, k) -> n.compareKey((ServerName) k); 095 private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR = 096 (n, k) -> n.compareKey((TableName) k); 097 private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR = 098 (n, k) -> n.compareKey((String) k); 099 private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR = 100 (n, k) -> n.compareKey((TableName) k); 101 private final static AvlKeyComparator<GlobalQueue> GLOBAL_QUEUE_KEY_COMPARATOR = 102 (n, k) -> n.compareKey((String) k); 103 104 private final FairQueue<ServerName> serverRunQueue = new FairQueue<>(); 105 private final FairQueue<TableName> tableRunQueue = new FairQueue<>(); 106 private final FairQueue<String> peerRunQueue = new FairQueue<>(); 107 private final FairQueue<TableName> metaRunQueue = new FairQueue<>(); 108 private final FairQueue<String> globalRunQueue = new FairQueue<>(); 109 110 private final ServerQueue[] serverBuckets = new ServerQueue[128]; 111 private TableQueue tableMap = null; 112 private PeerQueue peerMap = null; 113 private MetaQueue metaMap = null; 114 private GlobalQueue globalMap = null; 115 116 private final SchemaLocking locking; 117 118 public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) { 119 locking = new SchemaLocking(procedureRetriever); 120 } 121 122 @Override 123 public void yield(final Procedure proc) { 124 push(proc, false, true); 125 } 126 127 @Override 128 protected void enqueue(final Procedure proc, final boolean addFront) { 129 if (isMetaProcedure(proc)) { 130 doAdd(metaRunQueue, getMetaQueue(), proc, addFront); 131 } else if (isTableProcedure(proc)) { 132 doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); 133 } else if (isServerProcedure(proc)) { 134 ServerProcedureInterface spi = (ServerProcedureInterface) proc; 135 doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront); 136 } else if (isPeerProcedure(proc)) { 137 doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront); 138 } else if (isGlobalProcedure(proc)) { 139 doAdd(globalRunQueue, getGlobalQueue(getGlobalId(proc)), proc, addFront); 140 } else { 141 // TODO: at the moment we only have Table and Server procedures 142 // if you are implementing a non-table/non-server procedure, you have two options: create 143 // a group for all the non-table/non-server procedures or try to find a key for your 144 // non-table/non-server procedures and implement something similar to the TableRunQueue. 145 throw new UnsupportedOperationException( 146 "RQs for non-table/non-server procedures are not implemented yet: " + proc); 147 } 148 } 149 150 private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue, 151 Procedure<?> proc, boolean addFront) { 152 queue.add(proc, addFront); 153 // For the following conditions, we will put the queue back into execution 154 // 1. The procedure has already held the lock, or the lock has been restored when restarting, 155 // which means it can be executed immediately. 156 // 2. The exclusive lock for this queue has not been held. 157 // 3. The given procedure has the exclusive lock permission for this queue. 158 Supplier<String> reason = null; 159 if (proc.hasLock()) { 160 reason = () -> proc + " has lock"; 161 } else if (proc.isLockedWhenLoading()) { 162 reason = () -> proc + " restores lock when restarting"; 163 } else if (!queue.getLockStatus().hasExclusiveLock()) { 164 reason = () -> "the exclusive lock is not held by anyone when adding " + proc; 165 } else if (queue.getLockStatus().hasLockAccess(proc)) { 166 reason = () -> proc + " has the excusive lock access"; 167 } 168 if (reason != null) { 169 addToRunQueue(fairq, queue, reason); 170 } 171 } 172 173 @Override 174 protected boolean queueHasRunnables() { 175 return globalRunQueue.hasRunnables() || metaRunQueue.hasRunnables() 176 || tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables() 177 || peerRunQueue.hasRunnables(); 178 } 179 180 @Override 181 protected Procedure dequeue() { 182 // pull global first 183 Procedure<?> pollResult = doPoll(globalRunQueue); 184 // then meta procedure 185 if (pollResult == null) { 186 pollResult = doPoll(metaRunQueue); 187 } 188 // For now, let server handling have precedence over table handling; presumption is that it 189 // is more important handling crashed servers than it is running the 190 // enabling/disabling tables, etc. 191 if (pollResult == null) { 192 pollResult = doPoll(serverRunQueue); 193 } 194 if (pollResult == null) { 195 pollResult = doPoll(peerRunQueue); 196 } 197 if (pollResult == null) { 198 pollResult = doPoll(tableRunQueue); 199 } 200 return pollResult; 201 } 202 203 private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) { 204 LockStatus s = rq.getLockStatus(); 205 // if we have the lock access, we are ready 206 if (s.hasLockAccess(proc)) { 207 return true; 208 } 209 boolean xlockReq = rq.requireExclusiveLock(proc); 210 // if we need to hold the xlock, then we need to make sure that no one holds any lock, including 211 // the shared lock, otherwise, we just need to make sure that no one holds the xlock 212 return xlockReq ? !s.isLocked() : !s.hasExclusiveLock(); 213 } 214 215 private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) { 216 Queue<T> rq = fairq.poll(); 217 if (rq == null || !rq.isAvailable()) { 218 return null; 219 } 220 // loop until we find out a procedure which is ready to run, or if we have checked all the 221 // procedures, then we give up and remove the queue from run queue. 222 for (int i = 0, n = rq.size(); i < n; i++) { 223 Procedure<?> proc = rq.poll(); 224 if (isLockReady(proc, rq)) { 225 // the queue is empty, remove from run queue 226 if (rq.isEmpty()) { 227 removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc); 228 } 229 return proc; 230 } 231 // we are not ready to run, add back and try the next procedure 232 rq.add(proc, false); 233 } 234 // no procedure is ready for execution, remove from run queue 235 removeFromRunQueue(fairq, rq, () -> "no procedure can be executed"); 236 return null; 237 } 238 239 @Override 240 public List<LockedResource> getLocks() { 241 schedLock(); 242 try { 243 return locking.getLocks(); 244 } finally { 245 schedUnlock(); 246 } 247 } 248 249 @Override 250 public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) { 251 schedLock(); 252 try { 253 return locking.getLockResource(resourceType, resourceName); 254 } finally { 255 schedUnlock(); 256 } 257 } 258 259 @Override 260 public void clear() { 261 schedLock(); 262 try { 263 clearQueue(); 264 locking.clear(); 265 } finally { 266 schedUnlock(); 267 } 268 } 269 270 private void clearQueue() { 271 // Remove Servers 272 for (int i = 0; i < serverBuckets.length; ++i) { 273 clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR); 274 serverBuckets[i] = null; 275 } 276 277 // Remove Tables 278 clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); 279 tableMap = null; 280 281 // Remove Peers 282 clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR); 283 peerMap = null; 284 285 // Remove Meta 286 clear(metaMap, metaRunQueue, META_QUEUE_KEY_COMPARATOR); 287 metaMap = null; 288 289 // Remove Global 290 clear(globalMap, globalRunQueue, GLOBAL_QUEUE_KEY_COMPARATOR); 291 globalMap = null; 292 293 assert size() == 0 : "expected queue size to be 0, got " + size(); 294 } 295 296 private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap, 297 FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) { 298 while (treeMap != null) { 299 Queue<T> node = AvlTree.getFirst(treeMap); 300 treeMap = AvlTree.remove(treeMap, node.getKey(), comparator); 301 if (fairq != null) { 302 removeFromRunQueue(fairq, node, () -> "clear all queues"); 303 } 304 } 305 } 306 307 private int queueSize(Queue<?> head) { 308 int count = 0; 309 AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head); 310 while (iter.hasNext()) { 311 count += iter.next().size(); 312 } 313 return count; 314 } 315 316 @Override 317 protected int queueSize() { 318 int count = 0; 319 for (ServerQueue serverMap : serverBuckets) { 320 count += queueSize(serverMap); 321 } 322 count += queueSize(tableMap); 323 count += queueSize(peerMap); 324 count += queueSize(metaMap); 325 count += queueSize(globalMap); 326 return count; 327 } 328 329 @Override 330 public void completionCleanup(final Procedure proc) { 331 if (proc instanceof TableProcedureInterface) { 332 TableProcedureInterface iProcTable = (TableProcedureInterface) proc; 333 boolean tableDeleted; 334 if (proc.hasException()) { 335 Exception procEx = proc.getException().unwrapRemoteException(); 336 if (iProcTable.getTableOperationType() == TableOperationType.CREATE) { 337 // create failed because the table already exist 338 tableDeleted = !(procEx instanceof TableExistsException); 339 } else { 340 // the operation failed because the table does not exist 341 tableDeleted = (procEx instanceof TableNotFoundException); 342 } 343 } else { 344 // the table was deleted 345 tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE); 346 } 347 if (tableDeleted) { 348 markTableAsDeleted(iProcTable.getTableName(), proc); 349 return; 350 } 351 } else if (proc instanceof PeerProcedureInterface) { 352 tryCleanupPeerQueue(getPeerId(proc), proc); 353 } else if (proc instanceof ServerProcedureInterface) { 354 tryCleanupServerQueue(getServerName(proc), proc); 355 } else { 356 // No cleanup for other procedure types, yet. 357 return; 358 } 359 } 360 361 private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue, 362 Supplier<String> reason) { 363 if (LOG.isTraceEnabled()) { 364 LOG.trace("Add {} to run queue because: {}", queue, reason.get()); 365 } 366 if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) { 367 fairq.add(queue); 368 } 369 } 370 371 private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, 372 Queue<T> queue, Supplier<String> reason) { 373 if (LOG.isTraceEnabled()) { 374 LOG.trace("Remove {} from run queue because: {}", queue, reason.get()); 375 } 376 if (AvlIterableList.isLinked(queue)) { 377 fairq.remove(queue); 378 } 379 } 380 381 // ============================================================================ 382 // Table Queue Lookup Helpers 383 // ============================================================================ 384 private TableQueue getTableQueue(TableName tableName) { 385 TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); 386 if (node != null) return node; 387 388 node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName), 389 locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString())); 390 tableMap = AvlTree.insert(tableMap, node); 391 return node; 392 } 393 394 private void removeTableQueue(TableName tableName) { 395 tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); 396 locking.removeTableLock(tableName); 397 } 398 399 private static boolean isTableProcedure(Procedure<?> proc) { 400 return proc instanceof TableProcedureInterface; 401 } 402 403 private static TableName getTableName(Procedure<?> proc) { 404 return ((TableProcedureInterface) proc).getTableName(); 405 } 406 407 // ============================================================================ 408 // Server Queue Lookup Helpers 409 // ============================================================================ 410 private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) { 411 final int index = getBucketIndex(serverBuckets, serverName.hashCode()); 412 ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 413 if (node != null) { 414 return node; 415 } 416 int priority; 417 if (proc != null) { 418 priority = MasterProcedureUtil.getServerPriority(proc); 419 } else { 420 priority = 1; 421 } 422 node = new ServerQueue(serverName, priority, locking.getServerLock(serverName)); 423 serverBuckets[index] = AvlTree.insert(serverBuckets[index], node); 424 return node; 425 } 426 427 private void removeServerQueue(ServerName serverName) { 428 int index = getBucketIndex(serverBuckets, serverName.hashCode()); 429 serverBuckets[index] = 430 AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 431 locking.removeServerLock(serverName); 432 } 433 434 private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) { 435 schedLock(); 436 try { 437 int index = getBucketIndex(serverBuckets, serverName.hashCode()); 438 ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 439 if (node == null) { 440 return; 441 } 442 443 LockAndQueue lock = locking.getServerLock(serverName); 444 if (node.isEmpty() && lock.tryExclusiveLock(proc)) { 445 removeFromRunQueue(serverRunQueue, node, 446 () -> "clean up server queue after " + proc + " completed"); 447 removeServerQueue(serverName); 448 } 449 } finally { 450 schedUnlock(); 451 } 452 } 453 454 private static int getBucketIndex(Object[] buckets, int hashCode) { 455 return Math.abs(hashCode) % buckets.length; 456 } 457 458 private static boolean isServerProcedure(Procedure<?> proc) { 459 return proc instanceof ServerProcedureInterface; 460 } 461 462 private static ServerName getServerName(Procedure<?> proc) { 463 return ((ServerProcedureInterface) proc).getServerName(); 464 } 465 466 // ============================================================================ 467 // Peer Queue Lookup Helpers 468 // ============================================================================ 469 private PeerQueue getPeerQueue(String peerId) { 470 PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 471 if (node != null) { 472 return node; 473 } 474 node = new PeerQueue(peerId, locking.getPeerLock(peerId)); 475 peerMap = AvlTree.insert(peerMap, node); 476 return node; 477 } 478 479 private void removePeerQueue(String peerId) { 480 peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 481 locking.removePeerLock(peerId); 482 } 483 484 private void tryCleanupPeerQueue(String peerId, Procedure<?> procedure) { 485 schedLock(); 486 try { 487 PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 488 if (queue == null) { 489 return; 490 } 491 492 final LockAndQueue lock = locking.getPeerLock(peerId); 493 if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) { 494 removeFromRunQueue(peerRunQueue, queue, 495 () -> "clean up peer queue after " + procedure + " completed"); 496 removePeerQueue(peerId); 497 } 498 } finally { 499 schedUnlock(); 500 } 501 } 502 503 private static boolean isPeerProcedure(Procedure<?> proc) { 504 return proc instanceof PeerProcedureInterface; 505 } 506 507 private static String getPeerId(Procedure<?> proc) { 508 return ((PeerProcedureInterface) proc).getPeerId(); 509 } 510 511 // ============================================================================ 512 // Meta Queue Lookup Helpers 513 // ============================================================================ 514 private MetaQueue getMetaQueue() { 515 MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR); 516 if (node != null) { 517 return node; 518 } 519 node = new MetaQueue(locking.getMetaLock()); 520 metaMap = AvlTree.insert(metaMap, node); 521 return node; 522 } 523 524 private static boolean isMetaProcedure(Procedure<?> proc) { 525 return proc instanceof MetaProcedureInterface; 526 } 527 528 // ============================================================================ 529 // Global Queue Lookup Helpers 530 // ============================================================================ 531 private GlobalQueue getGlobalQueue(String globalId) { 532 GlobalQueue node = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR); 533 if (node != null) { 534 return node; 535 } 536 node = new GlobalQueue(globalId, locking.getGlobalLock(globalId)); 537 globalMap = AvlTree.insert(globalMap, node); 538 return node; 539 } 540 541 private void removeGlobalQueue(String globalId) { 542 globalMap = AvlTree.remove(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR); 543 locking.removeGlobalLock(globalId); 544 } 545 546 private void tryCleanupGlobalQueue(String globalId, Procedure<?> procedure) { 547 schedLock(); 548 try { 549 GlobalQueue queue = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR); 550 if (queue == null) { 551 return; 552 } 553 554 final LockAndQueue lock = locking.getGlobalLock(globalId); 555 if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) { 556 removeFromRunQueue(globalRunQueue, queue, 557 () -> "clean up global queue after " + procedure + " completed"); 558 removeGlobalQueue(globalId); 559 } 560 } finally { 561 schedUnlock(); 562 } 563 } 564 565 private static boolean isGlobalProcedure(Procedure<?> proc) { 566 return proc instanceof GlobalProcedureInterface; 567 } 568 569 private static String getGlobalId(Procedure<?> proc) { 570 return ((GlobalProcedureInterface) proc).getGlobalId(); 571 } 572 573 // ============================================================================ 574 // Table Locking Helpers 575 // ============================================================================ 576 /** 577 * Get lock info for a resource of specified type and name and log details 578 */ 579 private void logLockedResource(LockedResourceType resourceType, String resourceName) { 580 if (!LOG.isDebugEnabled()) { 581 return; 582 } 583 584 LockedResource lockedResource = getLockResource(resourceType, resourceName); 585 if (lockedResource != null) { 586 String msg = resourceType.toString() + " '" + resourceName + "', shared lock count=" 587 + lockedResource.getSharedLockCount(); 588 589 Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure(); 590 if (proc != null) { 591 msg += ", exclusively locked by procId=" + proc.getProcId(); 592 } 593 LOG.debug(msg); 594 } 595 } 596 597 /** 598 * Suspend the procedure if the specified table is already locked. Other operations in the 599 * table-queue will be executed after the lock is released. 600 * @param procedure the procedure trying to acquire the lock 601 * @param table Table to lock 602 * @return true if the procedure has to wait for the table to be available 603 */ 604 public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) { 605 schedLock(); 606 try { 607 final String namespace = table.getNamespaceAsString(); 608 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 609 final LockAndQueue tableLock = locking.getTableLock(table); 610 if (!namespaceLock.trySharedLock(procedure)) { 611 waitProcedure(namespaceLock, procedure); 612 logLockedResource(LockedResourceType.NAMESPACE, namespace); 613 return true; 614 } 615 if (!tableLock.tryExclusiveLock(procedure)) { 616 namespaceLock.releaseSharedLock(); 617 waitProcedure(tableLock, procedure); 618 logLockedResource(LockedResourceType.TABLE, table.getNameAsString()); 619 return true; 620 } 621 removeFromRunQueue(tableRunQueue, getTableQueue(table), 622 () -> procedure + " held the exclusive lock"); 623 return false; 624 } finally { 625 schedUnlock(); 626 } 627 } 628 629 /** 630 * Wake the procedures waiting for the specified table 631 * @param procedure the procedure releasing the lock 632 * @param table the name of the table that has the exclusive lock 633 */ 634 public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) { 635 schedLock(); 636 try { 637 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 638 final LockAndQueue tableLock = locking.getTableLock(table); 639 int waitingCount = 0; 640 if (tableLock.releaseExclusiveLock(procedure)) { 641 waitingCount += wakeWaitingProcedures(tableLock); 642 } 643 if (namespaceLock.releaseSharedLock()) { 644 waitingCount += wakeWaitingProcedures(namespaceLock); 645 } 646 addToRunQueue(tableRunQueue, getTableQueue(table), 647 () -> procedure + " released the exclusive lock"); 648 wakePollIfNeeded(waitingCount); 649 } finally { 650 schedUnlock(); 651 } 652 } 653 654 /** 655 * Suspend the procedure if the specified table is already locked. other "read" operations in the 656 * table-queue may be executed concurrently, 657 * @param procedure the procedure trying to acquire the lock 658 * @param table Table to lock 659 * @return true if the procedure has to wait for the table to be available 660 */ 661 public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) { 662 return waitTableQueueSharedLock(procedure, table) == null; 663 } 664 665 private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) { 666 schedLock(); 667 try { 668 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 669 final LockAndQueue tableLock = locking.getTableLock(table); 670 if (!namespaceLock.trySharedLock(procedure)) { 671 waitProcedure(namespaceLock, procedure); 672 return null; 673 } 674 675 if (!tableLock.trySharedLock(procedure)) { 676 namespaceLock.releaseSharedLock(); 677 waitProcedure(tableLock, procedure); 678 return null; 679 } 680 681 return getTableQueue(table); 682 } finally { 683 schedUnlock(); 684 } 685 } 686 687 /** 688 * Wake the procedures waiting for the specified table 689 * @param procedure the procedure releasing the lock 690 * @param table the name of the table that has the shared lock 691 */ 692 public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) { 693 schedLock(); 694 try { 695 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 696 final LockAndQueue tableLock = locking.getTableLock(table); 697 int waitingCount = 0; 698 if (tableLock.releaseSharedLock()) { 699 addToRunQueue(tableRunQueue, getTableQueue(table), 700 () -> procedure + " released the shared lock"); 701 waitingCount += wakeWaitingProcedures(tableLock); 702 } 703 if (namespaceLock.releaseSharedLock()) { 704 waitingCount += wakeWaitingProcedures(namespaceLock); 705 } 706 wakePollIfNeeded(waitingCount); 707 } finally { 708 schedUnlock(); 709 } 710 } 711 712 /** 713 * Tries to remove the queue and the table-lock of the specified table. If there are new 714 * operations pending (e.g. a new create), the remove will not be performed. 715 * @param table the name of the table that should be marked as deleted 716 * @param procedure the procedure that is removing the table 717 * @return true if deletion succeeded, false otherwise meaning that there are other new operations 718 * pending for that table (e.g. a new create). 719 */ 720 boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) { 721 schedLock(); 722 try { 723 final TableQueue queue = getTableQueue(table); 724 final LockAndQueue tableLock = locking.getTableLock(table); 725 if (queue == null) return true; 726 727 if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) { 728 // remove the table from the run-queue and the map 729 if (AvlIterableList.isLinked(queue)) { 730 tableRunQueue.remove(queue); 731 } 732 removeTableQueue(table); 733 } else { 734 // TODO: If there are no create, we can drop all the other ops 735 return false; 736 } 737 } finally { 738 schedUnlock(); 739 } 740 return true; 741 } 742 743 // ============================================================================ 744 // Region Locking Helpers 745 // ============================================================================ 746 /** 747 * Suspend the procedure if the specified region is already locked. 748 * @param procedure the procedure trying to acquire the lock on the region 749 * @param regionInfo the region we are trying to lock 750 * @return true if the procedure has to wait for the regions to be available 751 */ 752 public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) { 753 return waitRegions(procedure, regionInfo.getTable(), regionInfo); 754 } 755 756 /** 757 * Suspend the procedure if the specified set of regions are already locked. 758 * @param procedure the procedure trying to acquire the lock on the regions 759 * @param table the table name of the regions we are trying to lock 760 * @param regionInfos the list of regions we are trying to lock 761 * @return true if the procedure has to wait for the regions to be available 762 */ 763 public boolean waitRegions(final Procedure<?> procedure, final TableName table, 764 final RegionInfo... regionInfos) { 765 Arrays.sort(regionInfos, RegionInfo.COMPARATOR); 766 schedLock(); 767 try { 768 assert table != null; 769 if (waitTableSharedLock(procedure, table)) { 770 return true; 771 } 772 773 // acquire region xlocks or wait 774 boolean hasLock = true; 775 final LockAndQueue[] regionLocks = new LockAndQueue[regionInfos.length]; 776 for (int i = 0; i < regionInfos.length; ++i) { 777 assert regionInfos[i] != null; 778 assert regionInfos[i].getTable() != null; 779 assert regionInfos[i].getTable().equals(table) : regionInfos[i] + " " + procedure; 780 assert i == 0 || regionInfos[i] != regionInfos[i - 1] 781 : "duplicate region: " + regionInfos[i]; 782 783 regionLocks[i] = locking.getRegionLock(regionInfos[i].getEncodedName()); 784 if (!regionLocks[i].tryExclusiveLock(procedure)) { 785 LOG.info("Waiting on xlock for {} held by pid={}", procedure, 786 regionLocks[i].getExclusiveLockProcIdOwner()); 787 waitProcedure(regionLocks[i], procedure); 788 hasLock = false; 789 while (i-- > 0) { 790 regionLocks[i].releaseExclusiveLock(procedure); 791 } 792 break; 793 } else { 794 LOG.info("Took xlock for {}", procedure); 795 } 796 } 797 798 if (!hasLock) { 799 wakeTableSharedLock(procedure, table); 800 } 801 return !hasLock; 802 } finally { 803 schedUnlock(); 804 } 805 } 806 807 /** 808 * Wake the procedures waiting for the specified region 809 * @param procedure the procedure that was holding the region 810 * @param regionInfo the region the procedure was holding 811 */ 812 public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) { 813 wakeRegions(procedure, regionInfo.getTable(), regionInfo); 814 } 815 816 /** 817 * Wake the procedures waiting for the specified regions 818 * @param procedure the procedure that was holding the regions 819 * @param regionInfos the list of regions the procedure was holding 820 */ 821 public void wakeRegions(final Procedure<?> procedure, final TableName table, 822 final RegionInfo... regionInfos) { 823 Arrays.sort(regionInfos, RegionInfo.COMPARATOR); 824 schedLock(); 825 try { 826 int numProcs = 0; 827 final Procedure<?>[] nextProcs = new Procedure[regionInfos.length]; 828 for (int i = 0; i < regionInfos.length; ++i) { 829 assert regionInfos[i].getTable().equals(table); 830 assert i == 0 || regionInfos[i] != regionInfos[i - 1] 831 : "duplicate region: " + regionInfos[i]; 832 833 LockAndQueue regionLock = locking.getRegionLock(regionInfos[i].getEncodedName()); 834 if (regionLock.releaseExclusiveLock(procedure)) { 835 if (!regionLock.isWaitingQueueEmpty()) { 836 // release one procedure at the time since regions has an xlock 837 nextProcs[numProcs++] = regionLock.removeFirst(); 838 } else { 839 locking.removeRegionLock(regionInfos[i].getEncodedName()); 840 } 841 } 842 } 843 844 // awake procedures if any 845 for (int i = numProcs - 1; i >= 0; --i) { 846 wakeProcedure(nextProcs[i]); 847 } 848 wakePollIfNeeded(numProcs); 849 // release the table shared-lock. 850 wakeTableSharedLock(procedure, table); 851 } finally { 852 schedUnlock(); 853 } 854 } 855 856 // ============================================================================ 857 // Namespace Locking Helpers 858 // ============================================================================ 859 /** 860 * Suspend the procedure if the specified namespace is already locked. 861 * @see #wakeNamespaceExclusiveLock(Procedure,String) 862 * @param procedure the procedure trying to acquire the lock 863 * @param namespace Namespace to lock 864 * @return true if the procedure has to wait for the namespace to be available 865 */ 866 public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) { 867 schedLock(); 868 try { 869 final LockAndQueue systemNamespaceTableLock = 870 locking.getTableLock(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME); 871 if (!systemNamespaceTableLock.trySharedLock(procedure)) { 872 waitProcedure(systemNamespaceTableLock, procedure); 873 logLockedResource(LockedResourceType.TABLE, 874 TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME.getNameAsString()); 875 return true; 876 } 877 878 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 879 if (!namespaceLock.tryExclusiveLock(procedure)) { 880 systemNamespaceTableLock.releaseSharedLock(); 881 waitProcedure(namespaceLock, procedure); 882 logLockedResource(LockedResourceType.NAMESPACE, namespace); 883 return true; 884 } 885 return false; 886 } finally { 887 schedUnlock(); 888 } 889 } 890 891 /** 892 * Wake the procedures waiting for the specified namespace 893 * @see #waitNamespaceExclusiveLock(Procedure,String) 894 * @param procedure the procedure releasing the lock 895 * @param namespace the namespace that has the exclusive lock 896 */ 897 public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) { 898 schedLock(); 899 try { 900 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 901 final LockAndQueue systemNamespaceTableLock = 902 locking.getTableLock(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME); 903 int waitingCount = 0; 904 if (namespaceLock.releaseExclusiveLock(procedure)) { 905 waitingCount += wakeWaitingProcedures(namespaceLock); 906 } 907 if (systemNamespaceTableLock.releaseSharedLock()) { 908 addToRunQueue(tableRunQueue, 909 getTableQueue(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME), 910 () -> procedure + " released namespace exclusive lock"); 911 waitingCount += wakeWaitingProcedures(systemNamespaceTableLock); 912 } 913 wakePollIfNeeded(waitingCount); 914 } finally { 915 schedUnlock(); 916 } 917 } 918 919 // ============================================================================ 920 // Server Locking Helpers 921 // ============================================================================ 922 /** 923 * Try to acquire the exclusive lock on the specified server. 924 * @see #wakeServerExclusiveLock(Procedure,ServerName) 925 * @param procedure the procedure trying to acquire the lock 926 * @param serverName Server to lock 927 * @return true if the procedure has to wait for the server to be available 928 */ 929 public boolean waitServerExclusiveLock(final Procedure<?> procedure, 930 final ServerName serverName) { 931 schedLock(); 932 try { 933 final LockAndQueue lock = locking.getServerLock(serverName); 934 if (lock.tryExclusiveLock(procedure)) { 935 // In tests we may pass procedures other than ServerProcedureInterface, just pass null if 936 // so. 937 removeFromRunQueue(serverRunQueue, 938 getServerQueue(serverName, 939 procedure instanceof ServerProcedureInterface 940 ? (ServerProcedureInterface) procedure 941 : null), 942 () -> procedure + " held exclusive lock"); 943 return false; 944 } 945 waitProcedure(lock, procedure); 946 logLockedResource(LockedResourceType.SERVER, serverName.getServerName()); 947 return true; 948 } finally { 949 schedUnlock(); 950 } 951 } 952 953 /** 954 * Wake the procedures waiting for the specified server 955 * @see #waitServerExclusiveLock(Procedure,ServerName) 956 * @param procedure the procedure releasing the lock 957 * @param serverName the server that has the exclusive lock 958 */ 959 public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) { 960 schedLock(); 961 try { 962 final LockAndQueue lock = locking.getServerLock(serverName); 963 // Only SCP will acquire/release server lock so do not need to check the return value here. 964 lock.releaseExclusiveLock(procedure); 965 // In tests we may pass procedures other than ServerProcedureInterface, just pass null if 966 // so. 967 addToRunQueue(serverRunQueue, 968 getServerQueue(serverName, 969 procedure instanceof ServerProcedureInterface 970 ? (ServerProcedureInterface) procedure 971 : null), 972 () -> procedure + " released exclusive lock"); 973 int waitingCount = wakeWaitingProcedures(lock); 974 wakePollIfNeeded(waitingCount); 975 } finally { 976 schedUnlock(); 977 } 978 } 979 980 // ============================================================================ 981 // Peer Locking Helpers 982 // ============================================================================ 983 /** 984 * Try to acquire the exclusive lock on the specified peer. 985 * @see #wakePeerExclusiveLock(Procedure, String) 986 * @param procedure the procedure trying to acquire the lock 987 * @param peerId peer to lock 988 * @return true if the procedure has to wait for the peer to be available 989 */ 990 public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) { 991 schedLock(); 992 try { 993 final LockAndQueue lock = locking.getPeerLock(peerId); 994 if (lock.tryExclusiveLock(procedure)) { 995 removeFromRunQueue(peerRunQueue, getPeerQueue(peerId), 996 () -> procedure + " held exclusive lock"); 997 return false; 998 } 999 waitProcedure(lock, procedure); 1000 logLockedResource(LockedResourceType.PEER, peerId); 1001 return true; 1002 } finally { 1003 schedUnlock(); 1004 } 1005 } 1006 1007 /** 1008 * Wake the procedures waiting for the specified peer 1009 * @see #waitPeerExclusiveLock(Procedure, String) 1010 * @param procedure the procedure releasing the lock 1011 * @param peerId the peer that has the exclusive lock 1012 */ 1013 public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) { 1014 schedLock(); 1015 try { 1016 final LockAndQueue lock = locking.getPeerLock(peerId); 1017 if (lock.releaseExclusiveLock(procedure)) { 1018 addToRunQueue(peerRunQueue, getPeerQueue(peerId), 1019 () -> procedure + " released exclusive lock"); 1020 int waitingCount = wakeWaitingProcedures(lock); 1021 wakePollIfNeeded(waitingCount); 1022 } 1023 } finally { 1024 schedUnlock(); 1025 } 1026 } 1027 1028 // ============================================================================ 1029 // Meta Locking Helpers 1030 // ============================================================================ 1031 /** 1032 * Try to acquire the exclusive lock on meta. 1033 * @see #wakeMetaExclusiveLock(Procedure) 1034 * @param procedure the procedure trying to acquire the lock 1035 * @return true if the procedure has to wait for meta to be available 1036 * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with 1037 * {@link RecoverMetaProcedure}. 1038 */ 1039 @Deprecated 1040 public boolean waitMetaExclusiveLock(Procedure<?> procedure) { 1041 schedLock(); 1042 try { 1043 final LockAndQueue lock = locking.getMetaLock(); 1044 if (lock.tryExclusiveLock(procedure)) { 1045 removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock"); 1046 return false; 1047 } 1048 waitProcedure(lock, procedure); 1049 logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString()); 1050 return true; 1051 } finally { 1052 schedUnlock(); 1053 } 1054 } 1055 1056 /** 1057 * Wake the procedures waiting for meta. 1058 * @see #waitMetaExclusiveLock(Procedure) 1059 * @param procedure the procedure releasing the lock 1060 * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with 1061 * {@link RecoverMetaProcedure}. 1062 */ 1063 @Deprecated 1064 public void wakeMetaExclusiveLock(Procedure<?> procedure) { 1065 schedLock(); 1066 try { 1067 final LockAndQueue lock = locking.getMetaLock(); 1068 lock.releaseExclusiveLock(procedure); 1069 addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock"); 1070 int waitingCount = wakeWaitingProcedures(lock); 1071 wakePollIfNeeded(waitingCount); 1072 } finally { 1073 schedUnlock(); 1074 } 1075 } 1076 1077 // ============================================================================ 1078 // Global Locking Helpers 1079 // ============================================================================ 1080 /** 1081 * Try to acquire the share lock on global. 1082 * @see #wakeGlobalExclusiveLock(Procedure, String) 1083 * @param procedure the procedure trying to acquire the lock 1084 * @return true if the procedure has to wait for global to be available 1085 */ 1086 public boolean waitGlobalExclusiveLock(Procedure<?> procedure, String globalId) { 1087 schedLock(); 1088 try { 1089 final LockAndQueue lock = locking.getGlobalLock(globalId); 1090 if (lock.tryExclusiveLock(procedure)) { 1091 removeFromRunQueue(globalRunQueue, getGlobalQueue(globalId), 1092 () -> procedure + " held shared lock"); 1093 return false; 1094 } 1095 waitProcedure(lock, procedure); 1096 logLockedResource(LockedResourceType.GLOBAL, HConstants.EMPTY_STRING); 1097 return true; 1098 } finally { 1099 schedUnlock(); 1100 } 1101 } 1102 1103 /** 1104 * Wake the procedures waiting for global. 1105 * @see #waitGlobalExclusiveLock(Procedure, String) 1106 * @param procedure the procedure releasing the lock 1107 */ 1108 public void wakeGlobalExclusiveLock(Procedure<?> procedure, String globalId) { 1109 schedLock(); 1110 try { 1111 final LockAndQueue lock = locking.getGlobalLock(globalId); 1112 lock.releaseExclusiveLock(procedure); 1113 addToRunQueue(globalRunQueue, getGlobalQueue(globalId), 1114 () -> procedure + " released shared lock"); 1115 int waitingCount = wakeWaitingProcedures(lock); 1116 wakePollIfNeeded(waitingCount); 1117 } finally { 1118 schedUnlock(); 1119 } 1120 } 1121 1122 /** 1123 * For debugging. Expensive. 1124 */ 1125 public String dumpLocks() throws IOException { 1126 schedLock(); 1127 try { 1128 // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter 1129 return this.locking.toString(); 1130 } finally { 1131 schedUnlock(); 1132 } 1133 } 1134 1135 private void serverBucketToString(ToStringBuilder builder, String queueName, Queue<?> queue) { 1136 int size = queueSize(queue); 1137 if (size != 0) { 1138 builder.append(queueName, queue); 1139 } 1140 } 1141 1142 @Override 1143 public String toString() { 1144 ToStringBuilder builder = 1145 new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).appendSuper(super.toString()); 1146 schedLock(); 1147 try { 1148 for (int i = 0; i < serverBuckets.length; i++) { 1149 serverBucketToString(builder, "serverBuckets[" + i + "]", serverBuckets[i]); 1150 } 1151 builder.append("tableMap", tableMap); 1152 builder.append("peerMap", peerMap); 1153 builder.append("metaMap", metaMap); 1154 builder.append("globalMap", globalMap); 1155 } finally { 1156 schedUnlock(); 1157 } 1158 return builder.build(); 1159 } 1160}