001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2; 020 021import java.util.function.Function; 022import java.util.function.Predicate; 023import java.util.stream.Stream; 024import org.apache.yetus.audience.InterfaceAudience; 025 026/** 027 * Locking for mutual exclusion between procedures. Used only by procedure framework internally. 028 * {@link LockAndQueue} has two purposes: 029 * <ol> 030 * <li>Acquire/release exclusive/shared locks.</li> 031 * <li>Maintains a list of procedures waiting on this lock. {@link LockAndQueue} extends 032 * {@link ProcedureDeque} class. Blocked Procedures are added to our super Deque. Using inheritance 033 * over composition to keep the Deque of waiting Procedures is unusual, but we do it this way 034 * because in certain cases, there will be millions of regions. This layout uses less memory. 035 * </ol> 036 * <p/> 037 * NOT thread-safe. Needs external concurrency control: e.g. uses in MasterProcedureScheduler are 038 * guarded by schedLock(). <br/> 039 * There is no need of 'volatile' keyword for member variables because of memory synchronization 040 * guarantees of locks (see 'Memory Synchronization', 041 * http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html) <br/> 042 * We do not implement Lock interface because we need exclusive and shared locking, and also because 043 * try-lock functions require procedure id. <br/> 044 * We do not use ReentrantReadWriteLock directly because of its high memory overhead. 045 */ 046@InterfaceAudience.Private 047public class LockAndQueue implements LockStatus { 048 049 private final Function<Long, Procedure<?>> procedureRetriever; 050 private final ProcedureDeque queue = new ProcedureDeque(); 051 private Procedure<?> exclusiveLockOwnerProcedure = null; 052 private int sharedLock = 0; 053 054 // ====================================================================== 055 // Lock Status 056 // ====================================================================== 057 058 public LockAndQueue(Function<Long, Procedure<?>> procedureRetriever) { 059 this.procedureRetriever = procedureRetriever; 060 } 061 062 @Override 063 public boolean hasExclusiveLock() { 064 return this.exclusiveLockOwnerProcedure != null; 065 } 066 067 @Override 068 public boolean hasLockAccess(Procedure<?> proc) { 069 if (exclusiveLockOwnerProcedure == null) { 070 return false; 071 } 072 long lockOwnerId = exclusiveLockOwnerProcedure.getProcId(); 073 if (proc.getProcId() == lockOwnerId) { 074 return true; 075 } 076 if (!proc.hasParent()) { 077 return false; 078 } 079 // fast path to check root procedure 080 if (proc.getRootProcId() == lockOwnerId) { 081 return true; 082 } 083 // check ancestors 084 for (Procedure<?> p = proc;;) { 085 if (p.getParentProcId() == lockOwnerId) { 086 return true; 087 } 088 p = procedureRetriever.apply(p.getParentProcId()); 089 if (p == null || !p.hasParent()) { 090 return false; 091 } 092 } 093 } 094 095 @Override 096 public Procedure<?> getExclusiveLockOwnerProcedure() { 097 return exclusiveLockOwnerProcedure; 098 } 099 100 @Override 101 public int getSharedLockCount() { 102 return sharedLock; 103 } 104 105 // ====================================================================== 106 // try/release Shared/Exclusive lock 107 // ====================================================================== 108 109 /** 110 * @return whether we have succesfully acquired the shared lock. 111 */ 112 public boolean trySharedLock(Procedure<?> proc) { 113 if (hasExclusiveLock() && !hasLockAccess(proc)) { 114 return false; 115 } 116 // If no one holds the xlock, then we are free to hold the sharedLock 117 // If the parent proc or we have already held the xlock, then we return true here as 118 // xlock is more powerful then shared lock. 119 sharedLock++; 120 return true; 121 } 122 123 /** 124 * @return whether we should wake the procedures waiting on the lock here. 125 */ 126 public boolean releaseSharedLock() { 127 // hasExclusiveLock could be true, it usually means we acquire shared lock while we or our 128 // parent have held the xlock. And since there is still an exclusive lock, we do not need to 129 // wake any procedures. 130 return --sharedLock == 0 && !hasExclusiveLock(); 131 } 132 133 public boolean tryExclusiveLock(Procedure<?> proc) { 134 if (isLocked()) { 135 return hasLockAccess(proc); 136 } 137 exclusiveLockOwnerProcedure = proc; 138 return true; 139 } 140 141 /** 142 * @return whether we should wake the procedures waiting on the lock here. 143 */ 144 public boolean releaseExclusiveLock(Procedure<?> proc) { 145 if (exclusiveLockOwnerProcedure == null || 146 exclusiveLockOwnerProcedure.getProcId() != proc.getProcId()) { 147 // We are not the lock owner, it is probably inherited from the parent procedures. 148 return false; 149 } 150 exclusiveLockOwnerProcedure = null; 151 // This maybe a bit strange so let me explain. We allow acquiring shared lock while the parent 152 // proc or we have already held the xlock, and also allow releasing the locks in any order, so 153 // it could happen that the xlock is released but there are still some procs holding the shared 154 // lock. 155 // In HBase, this could happen when a proc which holdLock is false and schedules sub procs which 156 // acquire the shared lock on the same lock. This is because we will schedule the sub proces 157 // before releasing the lock, so the sub procs could call acquire lock before we releasing the 158 // xlock. 159 return sharedLock == 0; 160 } 161 162 public boolean isWaitingQueueEmpty() { 163 return queue.isEmpty(); 164 } 165 166 public Procedure<?> removeFirst() { 167 return queue.removeFirst(); 168 } 169 170 public void addLast(Procedure<?> proc) { 171 queue.addLast(proc); 172 } 173 174 public int wakeWaitingProcedures(ProcedureScheduler scheduler) { 175 int count = queue.size(); 176 // wakeProcedure adds to the front of queue, so we start from last in the waitQueue' queue, so 177 // that the procedure which was added first goes in the front for the scheduler queue. 178 scheduler.addFront(queue.descendingIterator()); 179 queue.clear(); 180 return count; 181 } 182 183 @SuppressWarnings("rawtypes") 184 public Stream<Procedure> filterWaitingQueue(Predicate<Procedure> predicate) { 185 return queue.stream().filter(predicate); 186 } 187 188 @Override 189 public String toString() { 190 return "exclusiveLockOwner=" + (hasExclusiveLock() ? getExclusiveLockProcIdOwner() : "NONE") + 191 ", sharedLockCount=" + getSharedLockCount() + ", waitingProcCount=" + queue.size(); 192 } 193}