001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.util.function.Function; 021import java.util.function.Predicate; 022import java.util.stream.Stream; 023import org.apache.yetus.audience.InterfaceAudience; 024 025/** 026 * Locking for mutual exclusion between procedures. Used only by procedure framework internally. 027 * {@link LockAndQueue} has two purposes: 028 * <ol> 029 * <li>Acquire/release exclusive/shared locks.</li> 030 * <li>Maintains a list of procedures waiting on this lock. {@link LockAndQueue} extends 031 * {@link ProcedureDeque} class. Blocked Procedures are added to our super Deque. Using inheritance 032 * over composition to keep the Deque of waiting Procedures is unusual, but we do it this way 033 * because in certain cases, there will be millions of regions. This layout uses less memory. 034 * </ol> 035 * <p/> 036 * NOT thread-safe. Needs external concurrency control: e.g. uses in MasterProcedureScheduler are 037 * guarded by schedLock(). <br/> 038 * There is no need of 'volatile' keyword for member variables because of memory synchronization 039 * guarantees of locks (see 'Memory Synchronization', 040 * http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html) <br/> 041 * We do not implement Lock interface because we need exclusive and shared locking, and also because 042 * try-lock functions require procedure id. <br/> 043 * We do not use ReentrantReadWriteLock directly because of its high memory overhead. 044 */ 045@InterfaceAudience.Private 046public class LockAndQueue implements LockStatus { 047 048 private final Function<Long, Procedure<?>> procedureRetriever; 049 private final ProcedureDeque queue = new ProcedureDeque(); 050 private Procedure<?> exclusiveLockOwnerProcedure = null; 051 private int sharedLock = 0; 052 053 // ====================================================================== 054 // Lock Status 055 // ====================================================================== 056 057 public LockAndQueue(Function<Long, Procedure<?>> procedureRetriever) { 058 this.procedureRetriever = procedureRetriever; 059 } 060 061 @Override 062 public boolean hasExclusiveLock() { 063 return this.exclusiveLockOwnerProcedure != null; 064 } 065 066 @Override 067 public boolean hasLockAccess(Procedure<?> proc) { 068 if (exclusiveLockOwnerProcedure == null) { 069 return false; 070 } 071 long lockOwnerId = exclusiveLockOwnerProcedure.getProcId(); 072 if (proc.getProcId() == lockOwnerId) { 073 return true; 074 } 075 if (!proc.hasParent()) { 076 return false; 077 } 078 // fast path to check root procedure 079 if (proc.getRootProcId() == lockOwnerId) { 080 return true; 081 } 082 // check ancestors 083 for (Procedure<?> p = proc;;) { 084 if (p.getParentProcId() == lockOwnerId) { 085 return true; 086 } 087 p = procedureRetriever.apply(p.getParentProcId()); 088 if (p == null || !p.hasParent()) { 089 return false; 090 } 091 } 092 } 093 094 @Override 095 public Procedure<?> getExclusiveLockOwnerProcedure() { 096 return exclusiveLockOwnerProcedure; 097 } 098 099 @Override 100 public int getSharedLockCount() { 101 return sharedLock; 102 } 103 104 // ====================================================================== 105 // try/release Shared/Exclusive lock 106 // ====================================================================== 107 108 /** Returns whether we have succesfully acquired the shared lock. */ 109 public boolean trySharedLock(Procedure<?> proc) { 110 if (hasExclusiveLock() && !hasLockAccess(proc)) { 111 return false; 112 } 113 // If no one holds the xlock, then we are free to hold the sharedLock 114 // If the parent proc or we have already held the xlock, then we return true here as 115 // xlock is more powerful then shared lock. 116 sharedLock++; 117 return true; 118 } 119 120 /** Returns whether we should wake the procedures waiting on the lock here. */ 121 public boolean releaseSharedLock() { 122 // hasExclusiveLock could be true, it usually means we acquire shared lock while we or our 123 // parent have held the xlock. And since there is still an exclusive lock, we do not need to 124 // wake any procedures. 125 return --sharedLock == 0 && !hasExclusiveLock(); 126 } 127 128 public boolean tryExclusiveLock(Procedure<?> proc) { 129 if (isLocked()) { 130 return hasLockAccess(proc); 131 } 132 exclusiveLockOwnerProcedure = proc; 133 return true; 134 } 135 136 /** Returns whether we should wake the procedures waiting on the lock here. */ 137 public boolean releaseExclusiveLock(Procedure<?> proc) { 138 if ( 139 exclusiveLockOwnerProcedure == null 140 || exclusiveLockOwnerProcedure.getProcId() != proc.getProcId() 141 ) { 142 // We are not the lock owner, it is probably inherited from the parent procedures. 143 return false; 144 } 145 exclusiveLockOwnerProcedure = null; 146 // This maybe a bit strange so let me explain. We allow acquiring shared lock while the parent 147 // proc or we have already held the xlock, and also allow releasing the locks in any order, so 148 // it could happen that the xlock is released but there are still some procs holding the shared 149 // lock. 150 // In HBase, this could happen when a proc which holdLock is false and schedules sub procs which 151 // acquire the shared lock on the same lock. This is because we will schedule the sub proces 152 // before releasing the lock, so the sub procs could call acquire lock before we releasing the 153 // xlock. 154 return sharedLock == 0; 155 } 156 157 public boolean isWaitingQueueEmpty() { 158 return queue.isEmpty(); 159 } 160 161 public Procedure<?> removeFirst() { 162 return queue.removeFirst(); 163 } 164 165 public void addLast(Procedure<?> proc) { 166 queue.addLast(proc); 167 } 168 169 public int wakeWaitingProcedures(ProcedureScheduler scheduler) { 170 int count = queue.size(); 171 // wakeProcedure adds to the front of queue, so we start from last in the waitQueue' queue, so 172 // that the procedure which was added first goes in the front for the scheduler queue. 173 scheduler.addFront(queue.descendingIterator()); 174 queue.clear(); 175 return count; 176 } 177 178 @SuppressWarnings("rawtypes") 179 public Stream<Procedure> filterWaitingQueue(Predicate<Procedure> predicate) { 180 return queue.stream().filter(predicate); 181 } 182 183 @Override 184 public String toString() { 185 return "exclusiveLockOwner=" + (hasExclusiveLock() ? getExclusiveLockProcIdOwner() : "NONE") 186 + ", sharedLockCount=" + getSharedLockCount() + ", waitingProcCount=" + queue.size(); 187 } 188}