001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.Optional; 027import java.util.function.Consumer; 028import java.util.function.Function; 029import java.util.function.Supplier; 030import org.apache.commons.lang3.builder.ToStringBuilder; 031import org.apache.commons.lang3.builder.ToStringStyle; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.TableExistsException; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.TableNotFoundException; 037import org.apache.hadoop.hbase.client.RegionInfo; 038import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; 039import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; 040import org.apache.hadoop.hbase.procedure2.LockAndQueue; 041import org.apache.hadoop.hbase.procedure2.LockStatus; 042import org.apache.hadoop.hbase.procedure2.LockedResource; 043import org.apache.hadoop.hbase.procedure2.LockedResourceType; 044import org.apache.hadoop.hbase.procedure2.Procedure; 045import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; 046import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator; 047import org.apache.hadoop.hbase.util.AvlUtil.AvlTree; 048import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * ProcedureScheduler for the Master Procedures. This ProcedureScheduler tries to provide to the 055 * ProcedureExecutor procedures that can be executed without having to wait on a lock. Most of the 056 * master operations can be executed concurrently, if they are operating on different tables (e.g. 057 * two create table procedures can be performed at the same time) or against two different servers; 058 * say two servers that crashed at about the same time. 059 * <p> 060 * Each procedure should implement an Interface providing information for this queue. For example 061 * table related procedures should implement TableProcedureInterface. Each procedure will be pushed 062 * in its own queue, and based on the operation type we may make smarter decisions: e.g. we can 063 * abort all the operations preceding a delete table, or similar. 064 * <h4>Concurrency control</h4> Concurrent access to member variables (tableRunQueue, 065 * serverRunQueue, locking, tableMap, serverBuckets) is controlled by schedLock(). This mainly 066 * includes:<br> 067 * <ul> 068 * <li>{@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue when: 069 * <ol> 070 * <li>Queue was empty before push (so must have been out of run-queue)</li> 071 * <li>Child procedure is added (which means parent procedure holds exclusive lock, and it must have 072 * moved Queue out of run-queue)</li> 073 * </ol> 074 * </li> 075 * <li>{@link #poll(long)}: A poll will remove a Queue from run-queue when: 076 * <ol> 077 * <li>Queue becomes empty after poll</li> 078 * <li>Exclusive lock is requested by polled procedure and lock is available (returns the 079 * procedure)</li> 080 * <li>Exclusive lock is requested but lock is not available (returns null)</li> 081 * <li>Polled procedure is child of parent holding exclusive lock and the next procedure is not a 082 * child</li> 083 * </ol> 084 * </li> 085 * <li>Namespace/table/region locks: Queue is added back to run-queue when lock being released is: 086 * <ol> 087 * <li>Exclusive lock</li> 088 * <li>Last shared lock (in case queue was removed because next procedure in queue required 089 * exclusive lock)</li> 090 * </ol> 091 * </li> 092 * </ul> 093 */ 094@InterfaceAudience.Private 095public class MasterProcedureScheduler extends AbstractProcedureScheduler { 096 private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class); 097 098 private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR = 099 (n, k) -> n.compareKey((ServerName) k); 100 private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR = 101 (n, k) -> n.compareKey((TableName) k); 102 private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR = 103 (n, k) -> n.compareKey((String) k); 104 private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR = 105 (n, k) -> n.compareKey((TableName) k); 106 private final static AvlKeyComparator<GlobalQueue> GLOBAL_QUEUE_KEY_COMPARATOR = 107 (n, k) -> n.compareKey((String) k); 108 109 private final FairQueue<ServerName> serverRunQueue = new FairQueue<>(); 110 private final FairQueue<TableName> tableRunQueue = new FairQueue<>(); 111 private final FairQueue<String> peerRunQueue = new FairQueue<>(); 112 private final FairQueue<TableName> metaRunQueue = new FairQueue<>(); 113 private final FairQueue<String> globalRunQueue = new FairQueue<>(); 114 115 private final ServerQueue[] serverBuckets = new ServerQueue[128]; 116 private TableQueue tableMap = null; 117 private PeerQueue peerMap = null; 118 private MetaQueue metaMap = null; 119 private GlobalQueue globalMap = null; 120 121 private final Function<Long, Procedure<?>> procedureRetriever; 122 private final SchemaLocking locking; 123 124 // To prevent multiple Create/Modify/Disable/Enable table procedure run at the same time, we will 125 // keep table procedure in this queue first before actually enqueuing it to tableQueue 126 // Seee HBASE-28683 for more details 127 private final Map<TableName, TableProcedureWaitingQueue> tableProcsWaitingEnqueue = 128 new HashMap<>(); 129 130 public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) { 131 this.procedureRetriever = procedureRetriever; 132 locking = new SchemaLocking(procedureRetriever); 133 } 134 135 @Override 136 public void yield(final Procedure proc) { 137 push(proc, false, true); 138 } 139 140 private boolean shouldWaitBeforeEnqueuing(TableProcedureInterface proc) { 141 return TableQueue.requireTableExclusiveLock(proc); 142 } 143 144 @Override 145 protected void enqueue(final Procedure proc, final boolean addFront) { 146 if (isMetaProcedure(proc)) { 147 doAdd(metaRunQueue, getMetaQueue(), proc, addFront); 148 } else if (isTableProcedure(proc)) { 149 TableProcedureInterface tableProc = (TableProcedureInterface) proc; 150 if (shouldWaitBeforeEnqueuing(tableProc)) { 151 TableProcedureWaitingQueue waitingQueue = tableProcsWaitingEnqueue.computeIfAbsent( 152 tableProc.getTableName(), k -> new TableProcedureWaitingQueue(procedureRetriever)); 153 if (!waitingQueue.procedureSubmitted(proc)) { 154 // there is a table procedure for this table already enqueued, waiting 155 LOG.debug("There is already a procedure running for table {}, added {} to waiting queue", 156 tableProc.getTableName(), proc); 157 return; 158 } 159 } 160 doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); 161 } else if (isServerProcedure(proc)) { 162 ServerProcedureInterface spi = (ServerProcedureInterface) proc; 163 doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront); 164 } else if (isPeerProcedure(proc)) { 165 doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront); 166 } else if (isGlobalProcedure(proc)) { 167 doAdd(globalRunQueue, getGlobalQueue(getGlobalId(proc)), proc, addFront); 168 } else { 169 // TODO: at the moment we only have Table and Server procedures 170 // if you are implementing a non-table/non-server procedure, you have two options: create 171 // a group for all the non-table/non-server procedures or try to find a key for your 172 // non-table/non-server procedures and implement something similar to the TableRunQueue. 173 throw new UnsupportedOperationException( 174 "RQs for non-table/non-server procedures are not implemented yet: " + proc); 175 } 176 } 177 178 private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue, 179 Procedure<?> proc, boolean addFront) { 180 queue.add(proc, addFront); 181 // For the following conditions, we will put the queue back into execution 182 // 1. The procedure has already held the lock, or the lock has been restored when restarting, 183 // which means it can be executed immediately. 184 // 2. The exclusive lock for this queue has not been held. 185 // 3. The given procedure has the exclusive lock permission for this queue. 186 Supplier<String> reason = null; 187 if (proc.hasLock()) { 188 reason = () -> proc + " has lock"; 189 } else if (proc.isLockedWhenLoading()) { 190 reason = () -> proc + " restores lock when restarting"; 191 } else if (!queue.getLockStatus().hasExclusiveLock()) { 192 reason = () -> "the exclusive lock is not held by anyone when adding " + proc; 193 } else if (queue.getLockStatus().hasLockAccess(proc)) { 194 reason = () -> proc + " has the excusive lock access"; 195 } 196 if (reason != null) { 197 addToRunQueue(fairq, queue, reason); 198 } 199 } 200 201 @Override 202 protected boolean queueHasRunnables() { 203 return globalRunQueue.hasRunnables() || metaRunQueue.hasRunnables() 204 || tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables() 205 || peerRunQueue.hasRunnables(); 206 } 207 208 @Override 209 protected Procedure dequeue() { 210 // pull global first 211 Procedure<?> pollResult = doPoll(globalRunQueue); 212 // then meta procedure 213 if (pollResult == null) { 214 pollResult = doPoll(metaRunQueue); 215 } 216 // For now, let server handling have precedence over table handling; presumption is that it 217 // is more important handling crashed servers than it is running the 218 // enabling/disabling tables, etc. 219 if (pollResult == null) { 220 pollResult = doPoll(serverRunQueue); 221 } 222 if (pollResult == null) { 223 pollResult = doPoll(peerRunQueue); 224 } 225 if (pollResult == null) { 226 pollResult = doPoll(tableRunQueue); 227 } 228 return pollResult; 229 } 230 231 private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) { 232 LockStatus s = rq.getLockStatus(); 233 // if we have the lock access, we are ready 234 if (s.hasLockAccess(proc)) { 235 return true; 236 } 237 boolean xlockReq = rq.requireExclusiveLock(proc); 238 // if we need to hold the xlock, then we need to make sure that no one holds any lock, including 239 // the shared lock, otherwise, we just need to make sure that no one holds the xlock 240 return xlockReq ? !s.isLocked() : !s.hasExclusiveLock(); 241 } 242 243 private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) { 244 Queue<T> rq = fairq.poll(); 245 if (rq == null || !rq.isAvailable()) { 246 return null; 247 } 248 // loop until we find out a procedure which is ready to run, or if we have checked all the 249 // procedures, then we give up and remove the queue from run queue. 250 for (int i = 0, n = rq.size(); i < n; i++) { 251 Procedure<?> proc = rq.poll(); 252 if (isLockReady(proc, rq)) { 253 // the queue is empty, remove from run queue 254 if (rq.isEmpty()) { 255 removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc); 256 } 257 return proc; 258 } 259 // we are not ready to run, add back and try the next procedure 260 rq.add(proc, false); 261 } 262 // no procedure is ready for execution, remove from run queue 263 removeFromRunQueue(fairq, rq, () -> "no procedure can be executed"); 264 return null; 265 } 266 267 @Override 268 public List<LockedResource> getLocks() { 269 schedLock(); 270 try { 271 return locking.getLocks(); 272 } finally { 273 schedUnlock(); 274 } 275 } 276 277 @Override 278 public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) { 279 schedLock(); 280 try { 281 return locking.getLockResource(resourceType, resourceName); 282 } finally { 283 schedUnlock(); 284 } 285 } 286 287 @Override 288 public void clear() { 289 schedLock(); 290 try { 291 clearQueue(); 292 locking.clear(); 293 } finally { 294 schedUnlock(); 295 } 296 } 297 298 private void clearQueue() { 299 // Remove Servers 300 for (int i = 0; i < serverBuckets.length; ++i) { 301 clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR); 302 serverBuckets[i] = null; 303 } 304 305 // Remove Tables 306 clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); 307 tableMap = null; 308 tableProcsWaitingEnqueue.clear(); 309 310 // Remove Peers 311 clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR); 312 peerMap = null; 313 314 // Remove Meta 315 clear(metaMap, metaRunQueue, META_QUEUE_KEY_COMPARATOR); 316 metaMap = null; 317 318 // Remove Global 319 clear(globalMap, globalRunQueue, GLOBAL_QUEUE_KEY_COMPARATOR); 320 globalMap = null; 321 322 assert size() == 0 : "expected queue size to be 0, got " + size(); 323 } 324 325 private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap, 326 FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) { 327 while (treeMap != null) { 328 Queue<T> node = AvlTree.getFirst(treeMap); 329 treeMap = AvlTree.remove(treeMap, node.getKey(), comparator); 330 if (fairq != null) { 331 removeFromRunQueue(fairq, node, () -> "clear all queues"); 332 } 333 } 334 } 335 336 private int queueSize(Queue<?> head) { 337 int count = 0; 338 AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head); 339 while (iter.hasNext()) { 340 count += iter.next().size(); 341 } 342 return count; 343 } 344 345 @Override 346 protected int queueSize() { 347 int count = 0; 348 for (ServerQueue serverMap : serverBuckets) { 349 count += queueSize(serverMap); 350 } 351 count += queueSize(tableMap); 352 count += queueSize(peerMap); 353 count += queueSize(metaMap); 354 count += queueSize(globalMap); 355 for (TableProcedureWaitingQueue waitingQ : tableProcsWaitingEnqueue.values()) { 356 count += waitingQ.waitingSize(); 357 } 358 return count; 359 } 360 361 @Override 362 public void completionCleanup(final Procedure proc) { 363 if (isTableProcedure(proc)) { 364 TableProcedureInterface tableProc = (TableProcedureInterface) proc; 365 if (shouldWaitBeforeEnqueuing(tableProc)) { 366 schedLock(); 367 try { 368 TableProcedureWaitingQueue waitingQueue = 369 tableProcsWaitingEnqueue.get(tableProc.getTableName()); 370 if (waitingQueue != null) { 371 Optional<Procedure<?>> nextProc = waitingQueue.procedureCompleted(proc); 372 if (nextProc.isPresent()) { 373 // enqueue it 374 Procedure<?> next = nextProc.get(); 375 LOG.debug("{} completed, enqueue a new procedure {}", proc, next); 376 doAdd(tableRunQueue, getTableQueue(tableProc.getTableName()), next, false); 377 } else { 378 if (waitingQueue.isEmpty()) { 379 // there is no waiting procedures in it, remove 380 tableProcsWaitingEnqueue.remove(tableProc.getTableName()); 381 } 382 } 383 } else { 384 // this should not happen normally, warn it 385 LOG.warn("no waiting queue while completing {}, which should not happen", proc); 386 } 387 } finally { 388 schedUnlock(); 389 } 390 } 391 boolean tableDeleted; 392 if (proc.hasException()) { 393 Exception procEx = proc.getException().unwrapRemoteException(); 394 if (tableProc.getTableOperationType() == TableOperationType.CREATE) { 395 // create failed because the table already exist 396 tableDeleted = !(procEx instanceof TableExistsException); 397 } else { 398 // the operation failed because the table does not exist 399 tableDeleted = (procEx instanceof TableNotFoundException); 400 } 401 } else { 402 // the table was deleted 403 tableDeleted = (tableProc.getTableOperationType() == TableOperationType.DELETE); 404 } 405 if (tableDeleted) { 406 markTableAsDeleted(tableProc.getTableName(), proc); 407 } 408 } else if (proc instanceof PeerProcedureInterface) { 409 tryCleanupPeerQueue(getPeerId(proc), proc); 410 } else if (proc instanceof ServerProcedureInterface) { 411 tryCleanupServerQueue(getServerName(proc), proc); 412 } else if (proc instanceof GlobalProcedureInterface) { 413 tryCleanupGlobalQueue(getGlobalId(proc), proc); 414 } else { 415 // No cleanup for other procedure types, yet. 416 return; 417 } 418 } 419 420 private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue, 421 Supplier<String> reason) { 422 if (LOG.isTraceEnabled()) { 423 LOG.trace("Add {} to run queue because: {}", queue, reason.get()); 424 } 425 if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) { 426 fairq.add(queue); 427 } 428 } 429 430 private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, 431 Queue<T> queue, Supplier<String> reason) { 432 if (LOG.isTraceEnabled()) { 433 LOG.trace("Remove {} from run queue because: {}", queue, reason.get()); 434 } 435 if (AvlIterableList.isLinked(queue)) { 436 fairq.remove(queue); 437 } 438 } 439 440 // ============================================================================ 441 // Table Queue Lookup Helpers 442 // ============================================================================ 443 private TableQueue getTableQueue(TableName tableName) { 444 TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); 445 if (node != null) { 446 return node; 447 } 448 449 node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName), 450 locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString())); 451 tableMap = AvlTree.insert(tableMap, node); 452 return node; 453 } 454 455 private void removeTableQueue(TableName tableName) { 456 tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); 457 locking.removeTableLock(tableName); 458 } 459 460 /** 461 * Tries to remove the queue and the table-lock of the specified table. If there are new 462 * operations pending (e.g. a new create), the remove will not be performed. 463 * @param table the name of the table that should be marked as deleted 464 * @param procedure the procedure that is removing the table 465 * @return true if deletion succeeded, false otherwise meaning that there are other new operations 466 * pending for that table (e.g. a new create). 467 */ 468 @RestrictedApi(explanation = "Should only be called in tests", link = "", 469 allowedOnPath = ".*/(MasterProcedureScheduler.java|src/test/.*)") 470 boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) { 471 return tryCleanupQueue(table, procedure, () -> tableMap, TABLE_QUEUE_KEY_COMPARATOR, 472 locking::getTableLock, tableRunQueue, this::removeTableQueue); 473 } 474 475 private static boolean isTableProcedure(Procedure<?> proc) { 476 return proc instanceof TableProcedureInterface; 477 } 478 479 private static TableName getTableName(Procedure<?> proc) { 480 return ((TableProcedureInterface) proc).getTableName(); 481 } 482 483 // ============================================================================ 484 // Server Queue Lookup Helpers 485 // ============================================================================ 486 private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) { 487 final int index = getBucketIndex(serverBuckets, serverName.hashCode()); 488 ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 489 if (node != null) { 490 return node; 491 } 492 int priority; 493 if (proc != null) { 494 priority = MasterProcedureUtil.getServerPriority(proc); 495 } else { 496 priority = 1; 497 } 498 node = new ServerQueue(serverName, priority, locking.getServerLock(serverName)); 499 serverBuckets[index] = AvlTree.insert(serverBuckets[index], node); 500 return node; 501 } 502 503 private void removeServerQueue(ServerName serverName) { 504 int index = getBucketIndex(serverBuckets, serverName.hashCode()); 505 serverBuckets[index] = 506 AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 507 locking.removeServerLock(serverName); 508 } 509 510 private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) { 511 // serverBuckets 512 tryCleanupQueue(serverName, proc, 513 () -> serverBuckets[getBucketIndex(serverBuckets, serverName.hashCode())], 514 SERVER_QUEUE_KEY_COMPARATOR, locking::getServerLock, serverRunQueue, this::removeServerQueue); 515 } 516 517 private static int getBucketIndex(Object[] buckets, int hashCode) { 518 return Math.abs(hashCode) % buckets.length; 519 } 520 521 private static boolean isServerProcedure(Procedure<?> proc) { 522 return proc instanceof ServerProcedureInterface; 523 } 524 525 private static ServerName getServerName(Procedure<?> proc) { 526 return ((ServerProcedureInterface) proc).getServerName(); 527 } 528 529 // ============================================================================ 530 // Peer Queue Lookup Helpers 531 // ============================================================================ 532 private PeerQueue getPeerQueue(String peerId) { 533 PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 534 if (node != null) { 535 return node; 536 } 537 node = new PeerQueue(peerId, locking.getPeerLock(peerId)); 538 peerMap = AvlTree.insert(peerMap, node); 539 return node; 540 } 541 542 private void removePeerQueue(String peerId) { 543 peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 544 locking.removePeerLock(peerId); 545 } 546 547 private void tryCleanupPeerQueue(String peerId, Procedure<?> procedure) { 548 tryCleanupQueue(peerId, procedure, () -> peerMap, PEER_QUEUE_KEY_COMPARATOR, 549 locking::getPeerLock, peerRunQueue, this::removePeerQueue); 550 } 551 552 private static boolean isPeerProcedure(Procedure<?> proc) { 553 return proc instanceof PeerProcedureInterface; 554 } 555 556 private static String getPeerId(Procedure<?> proc) { 557 return ((PeerProcedureInterface) proc).getPeerId(); 558 } 559 560 // ============================================================================ 561 // Meta Queue Lookup Helpers 562 // ============================================================================ 563 private MetaQueue getMetaQueue() { 564 MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR); 565 if (node != null) { 566 return node; 567 } 568 node = new MetaQueue(locking.getMetaLock()); 569 metaMap = AvlTree.insert(metaMap, node); 570 return node; 571 } 572 573 private static boolean isMetaProcedure(Procedure<?> proc) { 574 return proc instanceof MetaProcedureInterface; 575 } 576 577 // ============================================================================ 578 // Global Queue Lookup Helpers 579 // ============================================================================ 580 private GlobalQueue getGlobalQueue(String globalId) { 581 GlobalQueue node = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR); 582 if (node != null) { 583 return node; 584 } 585 node = new GlobalQueue(globalId, locking.getGlobalLock(globalId)); 586 globalMap = AvlTree.insert(globalMap, node); 587 return node; 588 } 589 590 private void removeGlobalQueue(String globalId) { 591 globalMap = AvlTree.remove(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR); 592 locking.removeGlobalLock(globalId); 593 } 594 595 private void tryCleanupGlobalQueue(String globalId, Procedure<?> procedure) { 596 tryCleanupQueue(globalId, procedure, () -> globalMap, GLOBAL_QUEUE_KEY_COMPARATOR, 597 locking::getGlobalLock, globalRunQueue, this::removeGlobalQueue); 598 } 599 600 private static boolean isGlobalProcedure(Procedure<?> proc) { 601 return proc instanceof GlobalProcedureInterface; 602 } 603 604 private static String getGlobalId(Procedure<?> proc) { 605 return ((GlobalProcedureInterface) proc).getGlobalId(); 606 } 607 608 private <T extends Comparable<T>, TNode extends Queue<T>> boolean tryCleanupQueue(T id, 609 Procedure<?> proc, Supplier<TNode> getMap, AvlKeyComparator<TNode> comparator, 610 Function<T, LockAndQueue> getLock, FairQueue<T> runQueue, Consumer<T> removeQueue) { 611 schedLock(); 612 try { 613 Queue<T> queue = AvlTree.get(getMap.get(), id, comparator); 614 if (queue == null) { 615 return true; 616 } 617 618 LockAndQueue lock = getLock.apply(id); 619 if (queue.isEmpty() && lock.isWaitingQueueEmpty() && !lock.isLocked()) { 620 // 1. the queue is empty 621 // 2. no procedure is in the lock's waiting queue 622 // 3. no other one holds the lock. It is possible that someone else holds the lock, usually 623 // our parent procedures 624 // If we can meet all the above conditions, it is safe for us to remove the queue 625 removeFromRunQueue(runQueue, queue, () -> "clean up queue after " + proc + " completed"); 626 removeQueue.accept(id); 627 return true; 628 } else { 629 return false; 630 } 631 632 } finally { 633 schedUnlock(); 634 } 635 } 636 637 // ============================================================================ 638 // Table Locking Helpers 639 // ============================================================================ 640 /** 641 * Get lock info for a resource of specified type and name and log details 642 */ 643 private void logLockedResource(LockedResourceType resourceType, String resourceName) { 644 if (!LOG.isDebugEnabled()) { 645 return; 646 } 647 648 LockedResource lockedResource = getLockResource(resourceType, resourceName); 649 if (lockedResource != null) { 650 String msg = resourceType.toString() + " '" + resourceName + "', shared lock count=" 651 + lockedResource.getSharedLockCount(); 652 653 Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure(); 654 if (proc != null) { 655 msg += ", exclusively locked by procId=" + proc.getProcId(); 656 } 657 LOG.debug(msg); 658 } 659 } 660 661 /** 662 * Suspend the procedure if the specified table is already locked. Other operations in the 663 * table-queue will be executed after the lock is released. 664 * @param procedure the procedure trying to acquire the lock 665 * @param table Table to lock 666 * @return true if the procedure has to wait for the table to be available 667 */ 668 public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) { 669 schedLock(); 670 try { 671 final String namespace = table.getNamespaceAsString(); 672 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 673 final LockAndQueue tableLock = locking.getTableLock(table); 674 if (!namespaceLock.trySharedLock(procedure)) { 675 waitProcedure(namespaceLock, procedure); 676 logLockedResource(LockedResourceType.NAMESPACE, namespace); 677 return true; 678 } 679 if (!tableLock.tryExclusiveLock(procedure)) { 680 namespaceLock.releaseSharedLock(); 681 waitProcedure(tableLock, procedure); 682 logLockedResource(LockedResourceType.TABLE, table.getNameAsString()); 683 return true; 684 } 685 removeFromRunQueue(tableRunQueue, getTableQueue(table), 686 () -> procedure + " held the exclusive lock"); 687 return false; 688 } finally { 689 schedUnlock(); 690 } 691 } 692 693 /** 694 * Wake the procedures waiting for the specified table 695 * @param procedure the procedure releasing the lock 696 * @param table the name of the table that has the exclusive lock 697 */ 698 public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) { 699 schedLock(); 700 try { 701 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 702 final LockAndQueue tableLock = locking.getTableLock(table); 703 int waitingCount = 0; 704 if (tableLock.releaseExclusiveLock(procedure)) { 705 waitingCount += wakeWaitingProcedures(tableLock); 706 } 707 if (namespaceLock.releaseSharedLock()) { 708 waitingCount += wakeWaitingProcedures(namespaceLock); 709 } 710 addToRunQueue(tableRunQueue, getTableQueue(table), 711 () -> procedure + " released the exclusive lock"); 712 wakePollIfNeeded(waitingCount); 713 } finally { 714 schedUnlock(); 715 } 716 } 717 718 /** 719 * Suspend the procedure if the specified table is already locked. other "read" operations in the 720 * table-queue may be executed concurrently, 721 * @param procedure the procedure trying to acquire the lock 722 * @param table Table to lock 723 * @return true if the procedure has to wait for the table to be available 724 */ 725 public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) { 726 return waitTableQueueSharedLock(procedure, table) == null; 727 } 728 729 private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) { 730 schedLock(); 731 try { 732 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 733 final LockAndQueue tableLock = locking.getTableLock(table); 734 if (!namespaceLock.trySharedLock(procedure)) { 735 waitProcedure(namespaceLock, procedure); 736 return null; 737 } 738 739 if (!tableLock.trySharedLock(procedure)) { 740 namespaceLock.releaseSharedLock(); 741 waitProcedure(tableLock, procedure); 742 return null; 743 } 744 745 return getTableQueue(table); 746 } finally { 747 schedUnlock(); 748 } 749 } 750 751 /** 752 * Wake the procedures waiting for the specified table 753 * @param procedure the procedure releasing the lock 754 * @param table the name of the table that has the shared lock 755 */ 756 public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) { 757 schedLock(); 758 try { 759 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 760 final LockAndQueue tableLock = locking.getTableLock(table); 761 int waitingCount = 0; 762 if (tableLock.releaseSharedLock()) { 763 addToRunQueue(tableRunQueue, getTableQueue(table), 764 () -> procedure + " released the shared lock"); 765 waitingCount += wakeWaitingProcedures(tableLock); 766 } 767 if (namespaceLock.releaseSharedLock()) { 768 waitingCount += wakeWaitingProcedures(namespaceLock); 769 } 770 wakePollIfNeeded(waitingCount); 771 } finally { 772 schedUnlock(); 773 } 774 } 775 776 // ============================================================================ 777 // Region Locking Helpers 778 // ============================================================================ 779 /** 780 * Suspend the procedure if the specified region is already locked. 781 * @param procedure the procedure trying to acquire the lock on the region 782 * @param regionInfo the region we are trying to lock 783 * @return true if the procedure has to wait for the regions to be available 784 */ 785 public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) { 786 return waitRegions(procedure, regionInfo.getTable(), regionInfo); 787 } 788 789 /** 790 * Suspend the procedure if the specified set of regions are already locked. 791 * @param procedure the procedure trying to acquire the lock on the regions 792 * @param table the table name of the regions we are trying to lock 793 * @param regionInfos the list of regions we are trying to lock 794 * @return true if the procedure has to wait for the regions to be available 795 */ 796 public boolean waitRegions(final Procedure<?> procedure, final TableName table, 797 final RegionInfo... regionInfos) { 798 Arrays.sort(regionInfos, RegionInfo.COMPARATOR); 799 schedLock(); 800 try { 801 assert table != null; 802 if (waitTableSharedLock(procedure, table)) { 803 return true; 804 } 805 806 // acquire region xlocks or wait 807 boolean hasLock = true; 808 final LockAndQueue[] regionLocks = new LockAndQueue[regionInfos.length]; 809 for (int i = 0; i < regionInfos.length; ++i) { 810 assert regionInfos[i] != null; 811 assert regionInfos[i].getTable() != null; 812 assert regionInfos[i].getTable().equals(table) : regionInfos[i] + " " + procedure; 813 assert i == 0 || regionInfos[i] != regionInfos[i - 1] 814 : "duplicate region: " + regionInfos[i]; 815 816 regionLocks[i] = locking.getRegionLock(regionInfos[i].getEncodedName()); 817 if (!regionLocks[i].tryExclusiveLock(procedure)) { 818 LOG.info("Waiting on xlock for {} held by pid={}", procedure, 819 regionLocks[i].getExclusiveLockProcIdOwner()); 820 waitProcedure(regionLocks[i], procedure); 821 hasLock = false; 822 while (i-- > 0) { 823 regionLocks[i].releaseExclusiveLock(procedure); 824 } 825 break; 826 } else { 827 LOG.info("Took xlock for {}", procedure); 828 } 829 } 830 831 if (!hasLock) { 832 wakeTableSharedLock(procedure, table); 833 } 834 return !hasLock; 835 } finally { 836 schedUnlock(); 837 } 838 } 839 840 /** 841 * Wake the procedures waiting for the specified region 842 * @param procedure the procedure that was holding the region 843 * @param regionInfo the region the procedure was holding 844 */ 845 public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) { 846 wakeRegions(procedure, regionInfo.getTable(), regionInfo); 847 } 848 849 /** 850 * Wake the procedures waiting for the specified regions 851 * @param procedure the procedure that was holding the regions 852 * @param regionInfos the list of regions the procedure was holding 853 */ 854 public void wakeRegions(final Procedure<?> procedure, final TableName table, 855 final RegionInfo... regionInfos) { 856 Arrays.sort(regionInfos, RegionInfo.COMPARATOR); 857 schedLock(); 858 try { 859 int numProcs = 0; 860 final Procedure<?>[] nextProcs = new Procedure[regionInfos.length]; 861 for (int i = 0; i < regionInfos.length; ++i) { 862 assert regionInfos[i].getTable().equals(table); 863 assert i == 0 || regionInfos[i] != regionInfos[i - 1] 864 : "duplicate region: " + regionInfos[i]; 865 866 LockAndQueue regionLock = locking.getRegionLock(regionInfos[i].getEncodedName()); 867 if (regionLock.releaseExclusiveLock(procedure)) { 868 if (!regionLock.isWaitingQueueEmpty()) { 869 // release one procedure at the time since regions has an xlock 870 nextProcs[numProcs++] = regionLock.removeFirst(); 871 } else { 872 locking.removeRegionLock(regionInfos[i].getEncodedName()); 873 } 874 } 875 } 876 877 // awake procedures if any 878 for (int i = numProcs - 1; i >= 0; --i) { 879 wakeProcedure(nextProcs[i]); 880 } 881 wakePollIfNeeded(numProcs); 882 // release the table shared-lock. 883 wakeTableSharedLock(procedure, table); 884 } finally { 885 schedUnlock(); 886 } 887 } 888 889 // ============================================================================ 890 // Namespace Locking Helpers 891 // ============================================================================ 892 /** 893 * Suspend the procedure if the specified namespace is already locked. 894 * @see #wakeNamespaceExclusiveLock(Procedure,String) 895 * @param procedure the procedure trying to acquire the lock 896 * @param namespace Namespace to lock 897 * @return true if the procedure has to wait for the namespace to be available 898 */ 899 public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) { 900 schedLock(); 901 try { 902 final LockAndQueue systemNamespaceTableLock = 903 locking.getTableLock(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME); 904 if (!systemNamespaceTableLock.trySharedLock(procedure)) { 905 waitProcedure(systemNamespaceTableLock, procedure); 906 logLockedResource(LockedResourceType.TABLE, 907 TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME.getNameAsString()); 908 return true; 909 } 910 911 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 912 if (!namespaceLock.tryExclusiveLock(procedure)) { 913 systemNamespaceTableLock.releaseSharedLock(); 914 waitProcedure(namespaceLock, procedure); 915 logLockedResource(LockedResourceType.NAMESPACE, namespace); 916 return true; 917 } 918 return false; 919 } finally { 920 schedUnlock(); 921 } 922 } 923 924 /** 925 * Wake the procedures waiting for the specified namespace 926 * @see #waitNamespaceExclusiveLock(Procedure,String) 927 * @param procedure the procedure releasing the lock 928 * @param namespace the namespace that has the exclusive lock 929 */ 930 public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) { 931 schedLock(); 932 try { 933 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 934 final LockAndQueue systemNamespaceTableLock = 935 locking.getTableLock(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME); 936 int waitingCount = 0; 937 if (namespaceLock.releaseExclusiveLock(procedure)) { 938 waitingCount += wakeWaitingProcedures(namespaceLock); 939 } 940 if (systemNamespaceTableLock.releaseSharedLock()) { 941 addToRunQueue(tableRunQueue, 942 getTableQueue(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME), 943 () -> procedure + " released namespace exclusive lock"); 944 waitingCount += wakeWaitingProcedures(systemNamespaceTableLock); 945 } 946 wakePollIfNeeded(waitingCount); 947 } finally { 948 schedUnlock(); 949 } 950 } 951 952 // ============================================================================ 953 // Server Locking Helpers 954 // ============================================================================ 955 /** 956 * Try to acquire the exclusive lock on the specified server. 957 * @see #wakeServerExclusiveLock(Procedure,ServerName) 958 * @param procedure the procedure trying to acquire the lock 959 * @param serverName Server to lock 960 * @return true if the procedure has to wait for the server to be available 961 */ 962 public boolean waitServerExclusiveLock(final Procedure<?> procedure, 963 final ServerName serverName) { 964 schedLock(); 965 try { 966 final LockAndQueue lock = locking.getServerLock(serverName); 967 if (lock.tryExclusiveLock(procedure)) { 968 // In tests we may pass procedures other than ServerProcedureInterface, just pass null if 969 // so. 970 removeFromRunQueue(serverRunQueue, 971 getServerQueue(serverName, 972 procedure instanceof ServerProcedureInterface 973 ? (ServerProcedureInterface) procedure 974 : null), 975 () -> procedure + " held exclusive lock"); 976 return false; 977 } 978 waitProcedure(lock, procedure); 979 logLockedResource(LockedResourceType.SERVER, serverName.getServerName()); 980 return true; 981 } finally { 982 schedUnlock(); 983 } 984 } 985 986 /** 987 * Wake the procedures waiting for the specified server 988 * @see #waitServerExclusiveLock(Procedure,ServerName) 989 * @param procedure the procedure releasing the lock 990 * @param serverName the server that has the exclusive lock 991 */ 992 public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) { 993 schedLock(); 994 try { 995 final LockAndQueue lock = locking.getServerLock(serverName); 996 // Only SCP will acquire/release server lock so do not need to check the return value here. 997 lock.releaseExclusiveLock(procedure); 998 // In tests we may pass procedures other than ServerProcedureInterface, just pass null if 999 // so. 1000 addToRunQueue(serverRunQueue, 1001 getServerQueue(serverName, 1002 procedure instanceof ServerProcedureInterface 1003 ? (ServerProcedureInterface) procedure 1004 : null), 1005 () -> procedure + " released exclusive lock"); 1006 int waitingCount = wakeWaitingProcedures(lock); 1007 wakePollIfNeeded(waitingCount); 1008 } finally { 1009 schedUnlock(); 1010 } 1011 } 1012 1013 // ============================================================================ 1014 // Peer Locking Helpers 1015 // ============================================================================ 1016 /** 1017 * Try to acquire the exclusive lock on the specified peer. 1018 * @see #wakePeerExclusiveLock(Procedure, String) 1019 * @param procedure the procedure trying to acquire the lock 1020 * @param peerId peer to lock 1021 * @return true if the procedure has to wait for the peer to be available 1022 */ 1023 public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) { 1024 schedLock(); 1025 try { 1026 final LockAndQueue lock = locking.getPeerLock(peerId); 1027 if (lock.tryExclusiveLock(procedure)) { 1028 removeFromRunQueue(peerRunQueue, getPeerQueue(peerId), 1029 () -> procedure + " held exclusive lock"); 1030 return false; 1031 } 1032 waitProcedure(lock, procedure); 1033 logLockedResource(LockedResourceType.PEER, peerId); 1034 return true; 1035 } finally { 1036 schedUnlock(); 1037 } 1038 } 1039 1040 /** 1041 * Wake the procedures waiting for the specified peer 1042 * @see #waitPeerExclusiveLock(Procedure, String) 1043 * @param procedure the procedure releasing the lock 1044 * @param peerId the peer that has the exclusive lock 1045 */ 1046 public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) { 1047 schedLock(); 1048 try { 1049 final LockAndQueue lock = locking.getPeerLock(peerId); 1050 if (lock.releaseExclusiveLock(procedure)) { 1051 addToRunQueue(peerRunQueue, getPeerQueue(peerId), 1052 () -> procedure + " released exclusive lock"); 1053 int waitingCount = wakeWaitingProcedures(lock); 1054 wakePollIfNeeded(waitingCount); 1055 } 1056 } finally { 1057 schedUnlock(); 1058 } 1059 } 1060 1061 // ============================================================================ 1062 // Meta Locking Helpers 1063 // ============================================================================ 1064 /** 1065 * Try to acquire the exclusive lock on meta. 1066 * @see #wakeMetaExclusiveLock(Procedure) 1067 * @param procedure the procedure trying to acquire the lock 1068 * @return true if the procedure has to wait for meta to be available 1069 * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with 1070 * {@link RecoverMetaProcedure}. 1071 */ 1072 @Deprecated 1073 public boolean waitMetaExclusiveLock(Procedure<?> procedure) { 1074 schedLock(); 1075 try { 1076 final LockAndQueue lock = locking.getMetaLock(); 1077 if (lock.tryExclusiveLock(procedure)) { 1078 removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock"); 1079 return false; 1080 } 1081 waitProcedure(lock, procedure); 1082 logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString()); 1083 return true; 1084 } finally { 1085 schedUnlock(); 1086 } 1087 } 1088 1089 /** 1090 * Wake the procedures waiting for meta. 1091 * @see #waitMetaExclusiveLock(Procedure) 1092 * @param procedure the procedure releasing the lock 1093 * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with 1094 * {@link RecoverMetaProcedure}. 1095 */ 1096 @Deprecated 1097 public void wakeMetaExclusiveLock(Procedure<?> procedure) { 1098 schedLock(); 1099 try { 1100 final LockAndQueue lock = locking.getMetaLock(); 1101 lock.releaseExclusiveLock(procedure); 1102 addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock"); 1103 int waitingCount = wakeWaitingProcedures(lock); 1104 wakePollIfNeeded(waitingCount); 1105 } finally { 1106 schedUnlock(); 1107 } 1108 } 1109 1110 // ============================================================================ 1111 // Global Locking Helpers 1112 // ============================================================================ 1113 /** 1114 * Try to acquire the share lock on global. 1115 * @see #wakeGlobalExclusiveLock(Procedure, String) 1116 * @param procedure the procedure trying to acquire the lock 1117 * @return true if the procedure has to wait for global to be available 1118 */ 1119 public boolean waitGlobalExclusiveLock(Procedure<?> procedure, String globalId) { 1120 schedLock(); 1121 try { 1122 final LockAndQueue lock = locking.getGlobalLock(globalId); 1123 if (lock.tryExclusiveLock(procedure)) { 1124 removeFromRunQueue(globalRunQueue, getGlobalQueue(globalId), 1125 () -> procedure + " held shared lock"); 1126 return false; 1127 } 1128 waitProcedure(lock, procedure); 1129 logLockedResource(LockedResourceType.GLOBAL, HConstants.EMPTY_STRING); 1130 return true; 1131 } finally { 1132 schedUnlock(); 1133 } 1134 } 1135 1136 /** 1137 * Wake the procedures waiting for global. 1138 * @see #waitGlobalExclusiveLock(Procedure, String) 1139 * @param procedure the procedure releasing the lock 1140 */ 1141 public void wakeGlobalExclusiveLock(Procedure<?> procedure, String globalId) { 1142 schedLock(); 1143 try { 1144 final LockAndQueue lock = locking.getGlobalLock(globalId); 1145 lock.releaseExclusiveLock(procedure); 1146 addToRunQueue(globalRunQueue, getGlobalQueue(globalId), 1147 () -> procedure + " released shared lock"); 1148 int waitingCount = wakeWaitingProcedures(lock); 1149 wakePollIfNeeded(waitingCount); 1150 } finally { 1151 schedUnlock(); 1152 } 1153 } 1154 1155 /** 1156 * For debugging. Expensive. 1157 */ 1158 public String dumpLocks() throws IOException { 1159 schedLock(); 1160 try { 1161 // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter 1162 return this.locking.toString(); 1163 } finally { 1164 schedUnlock(); 1165 } 1166 } 1167 1168 private void serverBucketToString(ToStringBuilder builder, String queueName, Queue<?> queue) { 1169 int size = queueSize(queue); 1170 if (size != 0) { 1171 builder.append(queueName, queue); 1172 } 1173 } 1174 1175 @Override 1176 public String toString() { 1177 ToStringBuilder builder = 1178 new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).appendSuper(super.toString()); 1179 schedLock(); 1180 try { 1181 for (int i = 0; i < serverBuckets.length; i++) { 1182 serverBucketToString(builder, "serverBuckets[" + i + "]", serverBuckets[i]); 1183 } 1184 builder.append("tableMap", tableMap); 1185 builder.append("tableWaitingMap", tableProcsWaitingEnqueue); 1186 builder.append("peerMap", peerMap); 1187 builder.append("metaMap", metaMap); 1188 builder.append("globalMap", globalMap); 1189 } finally { 1190 schedUnlock(); 1191 } 1192 return builder.build(); 1193 } 1194}